// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

#include <gen_cpp/cloud.pb.h>
#include <gen_cpp/olap_file.pb.h>

#include <chrono>
#include <cstdint>
#include <limits>
#include <ranges>
#include <tuple>

#include "common/config.h"
#include "common/logging.h"
#include "common/stats.h"
#include "cpp/sync_point.h"
#include "meta-service/doris_txn.h"
#include "meta-service/meta_service.h"
#include "meta-service/meta_service_helper.h"
#include "meta-service/meta_service_tablet_stats.h"
#include "meta-store/blob_message.h"
#include "meta-store/clone_chain_reader.h"
#include "meta-store/document_message.h"
#include "meta-store/keys.h"
#include "meta-store/meta_reader.h"
#include "meta-store/txn_kv.h"
#include "meta-store/txn_kv_error.h"
#include "meta-store/versioned_value.h"

using namespace std::chrono;

namespace doris::cloud {

struct TableStats {
    int64_t updated_row_count = 0;

    TableStats() = default;

    TableStats(int64_t num_rows) : updated_row_count(num_rows) {}

    std::string to_string() const {
        std::stringstream ss;
        ss << "updated_row_count: " << updated_row_count;
        return ss.str();
    }
};

template <typename Container, typename Range>
static Container to_container(Range&& r) {
    return Container(r.begin(), r.end());
}

static void get_pb_from_tablestats(TableStats& stats, TableStatsPB* stats_pb) {
    stats_pb->set_updated_row_count(stats.updated_row_count);
}

static void calc_table_stats(std::unordered_map<int64_t, TabletIndexPB>& tablet_ids,
                             std::unordered_map<int64_t, TabletStats>& tablet_stats,
                             std::map<int64_t, TableStats>& table_stats,
                             std::vector<int64_t> base_tablet_ids) {
    int64_t table_id;

    VLOG_DEBUG << "base_tablet_ids size: " << base_tablet_ids.size();
    for (int64_t tablet_id : base_tablet_ids) {
        auto it = tablet_stats.find(tablet_id);
        if (it == tablet_stats.end()) {
            continue;
        }
        const auto& tablet_stat = it->second;
        table_id = tablet_ids[tablet_id].table_id();
        if (table_stats.find(table_id) == table_stats.end()) {
            table_stats[table_id] = TableStats(tablet_stat.num_rows);
        } else {
            table_stats[table_id].updated_row_count += tablet_stat.num_rows;
        }
    }
}

//TODO: we need move begin/commit etc txn to TxnManager
void MetaServiceImpl::begin_txn(::google::protobuf::RpcController* controller,
                                const BeginTxnRequest* request, BeginTxnResponse* response,
                                ::google::protobuf::Closure* done) {
    RPC_PREPROCESS(begin_txn, get, put);
    if (!request->has_txn_info()) {
        code = MetaServiceCode::INVALID_ARGUMENT;
        msg = "invalid argument, missing txn info";
        return;
    }

    auto& txn_info = const_cast<TxnInfoPB&>(request->txn_info());
    std::string label = txn_info.has_label() ? txn_info.label() : "";
    int64_t db_id = txn_info.has_db_id() ? txn_info.db_id() : -1;

    if (label.empty() || db_id < 0 || txn_info.table_ids().empty() || !txn_info.has_timeout_ms()) {
        code = MetaServiceCode::INVALID_ARGUMENT;
        ss << "invalid argument, label=" << label << " db_id=" << db_id;
        msg = ss.str();
        return;
    }

    std::string cloud_unique_id = request->has_cloud_unique_id() ? request->cloud_unique_id() : "";
    instance_id = get_instance_id(resource_mgr_, cloud_unique_id);
    if (instance_id.empty()) {
        code = MetaServiceCode::INVALID_ARGUMENT;
        ss << "cannot find instance_id with cloud_unique_id="
           << (cloud_unique_id.empty() ? "(empty)" : cloud_unique_id) << " label=" << label;
        msg = ss.str();
        return;
    }

    RPC_RATE_LIMIT(begin_txn)
    //1. Generate version stamp for txn id
    TxnErrorCode err = txn_kv_->create_txn(&txn);
    if (err != TxnErrorCode::TXN_OK) {
        code = cast_as<ErrCategory::CREATE>(err);
        ss << "txn_kv_->create_txn() failed, err=" << err << " label=" << label
           << " db_id=" << db_id;
        msg = ss.str();
        return;
    }

    const std::string label_key = txn_label_key({instance_id, db_id, label});
    std::string label_val;
    err = txn->get(label_key, &label_val);
    if (err != TxnErrorCode::TXN_OK && err != TxnErrorCode::TXN_KEY_NOT_FOUND) {
        code = cast_as<ErrCategory::READ>(err);
        ss << "txn->get failed(), err=" << err << " label=" << label;
        msg = ss.str();
        return;
    }

    LOG(INFO) << "txn->get label_key=" << hex(label_key) << " label=" << label << " err=" << err;

    // err == OK means label has previous txn ids.
    if (err == TxnErrorCode::TXN_OK) {
        label_val = label_val.substr(0, label_val.size() - VERSION_STAMP_LEN);
    }

    //ret > 0, means label not exist previously.
    txn->atomic_set_ver_value(label_key, label_val);
    LOG(INFO) << "txn->atomic_set_ver_value label_key=" << hex(label_key);

    TEST_SYNC_POINT_CALLBACK("begin_txn:before:commit_txn:1", &label);
    err = txn->commit();
    TEST_SYNC_POINT_CALLBACK("begin_txn:after:commit_txn:1", &label);
    if (err != TxnErrorCode::TXN_OK) {
        code = cast_as<ErrCategory::COMMIT>(err);
        ss << "txn->commit failed(), label=" << label << " err=" << err;
        msg = ss.str();
        return;
    }
    // get count before txn reset, if not we will lose these count
    stats.get_bytes += txn->get_bytes();
    stats.put_bytes += txn->put_bytes();
    stats.get_counter += txn->num_get_keys();
    stats.put_counter += txn->num_put_keys();
    //2. Get txn id from version stamp
    txn.reset();

    err = txn_kv_->create_txn(&txn);
    if (err != TxnErrorCode::TXN_OK) {
        code = cast_as<ErrCategory::CREATE>(err);
        ss << "failed to create txn when get txn id, label=" << label << " err=" << err;
        msg = ss.str();
        return;
    }

    label_val.clear();
    err = txn->get(label_key, &label_val);
    if (err != TxnErrorCode::TXN_OK) {
        code = cast_as<ErrCategory::READ>(err);
        ss << "txn->get() failed, label=" << label << " err=" << err;
        msg = ss.str();
        return;
    }

    LOG(INFO) << "txn->get label_key=" << hex(label_key) << " label=" << label << " err=" << err;

    // Generated by TxnKv system
    int64_t txn_id = 0;
    int ret =
            get_txn_id_from_fdb_ts(std::string_view(label_val).substr(
                                           label_val.size() - VERSION_STAMP_LEN, label_val.size()),
                                   &txn_id);
    if (ret != 0) {
        code = MetaServiceCode::TXN_GEN_ID_ERR;
        ss << "get_txn_id_from_fdb_ts() failed, label=" << label << " ret=" << ret;
        msg = ss.str();
        return;
    }

    LOG(INFO) << "get_txn_id_from_fdb_ts() label=" << label << " txn_id=" << txn_id
              << " label_val.size()=" << label_val.size();

    TxnLabelPB label_pb;
    if (label_val.size() > VERSION_STAMP_LEN) {
        //3. Check label
        //label_val.size() > VERSION_STAMP_LEN means label has previous txn ids.
        if (!label_pb.ParseFromArray(label_val.data(), label_val.size() - VERSION_STAMP_LEN)) {
            code = MetaServiceCode::PROTOBUF_PARSE_ERR;
            ss << "label_pb->ParseFromString() failed, txn_id=" << txn_id << " label=" << label;
            msg = ss.str();
            return;
        }

        // Check if label already used, by following steps
        // 1. get all existing transactions
        // 2. if there is a PREPARE transaction, check if this is a retry request.
        // 3. if there is a non-aborted transaction, throw label already used exception.

        for (auto it = label_pb.txn_ids().rbegin(); it != label_pb.txn_ids().rend(); ++it) {
            int64_t cur_txn_id = *it;
            const std::string cur_info_key = txn_info_key({instance_id, db_id, cur_txn_id});
            std::string cur_info_val;
            err = txn->get(cur_info_key, &cur_info_val);
            if (err != TxnErrorCode::TXN_OK && err != TxnErrorCode::TXN_KEY_NOT_FOUND) {
                code = cast_as<ErrCategory::READ>(err);
                ss << "txn->get() failed, cur_txn_id=" << cur_txn_id << " label=" << label
                   << " err=" << err;
                msg = ss.str();
                return;
            }

            if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) {
                //label_to_idx and txn info inconsistency.
                code = MetaServiceCode::TXN_ID_NOT_FOUND;
                ss << "txn->get() failed, cur_txn_id=" << cur_txn_id << " label=" << label
                   << " err=" << err;
                msg = ss.str();
                return;
            }

            TxnInfoPB cur_txn_info;
            if (!cur_txn_info.ParseFromString(cur_info_val)) {
                code = MetaServiceCode::PROTOBUF_PARSE_ERR;
                ss << "cur_txn_info->ParseFromString() failed, cur_txn_id=" << cur_txn_id
                   << " label=" << label << " err=" << err;
                msg = ss.str();
                return;
            }

            VLOG_DEBUG << "cur_txn_info=" << cur_txn_info.ShortDebugString();
            LOG(INFO) << " size=" << label_pb.txn_ids().size()
                      << " status=" << cur_txn_info.status() << " txn_id=" << txn_id
                      << " label=" << label;
            if (cur_txn_info.status() == TxnStatusPB::TXN_STATUS_ABORTED) {
                if (label_pb.txn_ids().size() >= config::max_num_aborted_txn) {
                    code = MetaServiceCode::INVALID_ARGUMENT;
                    ss << "too many aborted txn for label=" << label << " txn_id=" << txn_id
                       << ", please check your data quality";
                    msg = ss.str();
                    LOG(WARNING) << msg << " label_pb=" << label_pb.ShortDebugString();
                    return;
                }
                break;
            }

            if (cur_txn_info.status() == TxnStatusPB::TXN_STATUS_PREPARED ||
                cur_txn_info.status() == TxnStatusPB::TXN_STATUS_PRECOMMITTED) {
                // clang-format off
                if (cur_txn_info.has_request_id() && txn_info.has_request_id() &&
                    ((cur_txn_info.request_id().hi() == txn_info.request_id().hi()) &&
                     (cur_txn_info.request_id().lo() == txn_info.request_id().lo()))) {

                    response->set_dup_txn_id(cur_txn_info.txn_id());
                    code = MetaServiceCode::TXN_DUPLICATED_REQ;
                    ss << "db_id=" << db_id << " label=" << label << " txn_id=" << cur_txn_info.txn_id() << " dup begin txn request.";
                    msg = ss.str();
                    return;
                }
                // clang-format on
            }
            response->set_txn_status(cur_txn_info.status());
            code = MetaServiceCode::TXN_LABEL_ALREADY_USED;
            ss << "Label [" << label << "] has already been used, relate to txn ["
               << cur_txn_info.txn_id() << "], status=[" << TxnStatusPB_Name(cur_txn_info.status())
               << "]";
            msg = ss.str();
            return;
        }
    }

    // Update txn_info to be put into TxnKv
    // Update txn_id in PB
    txn_info.set_txn_id(txn_id);
    // TODO:
    // check initial status must be TXN_STATUS_PREPARED or TXN_STATUS_UNKNOWN
    txn_info.set_status(TxnStatusPB::TXN_STATUS_PREPARED);

    auto now_time = system_clock::now();
    uint64_t prepare_time = duration_cast<milliseconds>(now_time.time_since_epoch()).count();

    txn_info.set_prepare_time(prepare_time);
    //4. put txn info and db_tbl
    const std::string info_key = txn_info_key({instance_id, db_id, txn_id});
    std::string info_val;
    if (!txn_info.SerializeToString(&info_val)) {
        code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
        ss << "failed to serialize txn_info, label=" << label << " txn_id=" << txn_id;
        msg = ss.str();
        return;
    }

    const std::string index_key = txn_index_key({instance_id, txn_id});
    std::string index_val;
    TxnIndexPB index_pb;
    index_pb.mutable_tablet_index()->set_db_id(db_id);
    if (!index_pb.SerializeToString(&index_val)) {
        code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
        ss << "failed to serialize txn_index_pb "
           << "label=" << label << " txn_id=" << txn_id;
        msg = ss.str();
        return;
    }

    const std::string running_key = txn_running_key({instance_id, db_id, txn_id});
    std::string running_val;
    TxnRunningPB running_pb;
    running_pb.set_timeout_time(prepare_time + txn_info.timeout_ms());
    running_pb.mutable_table_ids()->CopyFrom(txn_info.table_ids());
    VLOG_DEBUG << "label=" << label << " txn_id=" << txn_id
               << "running_pb=" << running_pb.ShortDebugString();
    if (!running_pb.SerializeToString(&running_val)) {
        code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
        ss << "failed to serialize running_pb label=" << label << " txn_id=" << txn_id;
        msg = ss.str();
        return;
    }

    label_pb.add_txn_ids(txn_id);
    VLOG_DEBUG << "label=" << label << " txn_id=" << txn_id
               << "txn_label_pb=" << label_pb.ShortDebugString();
    if (!label_pb.SerializeToString(&label_val)) {
        code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
        ss << "failed to serialize txn_label_pb label=" << label << " txn_id=" << txn_id;
        msg = ss.str();
        return;
    }
    txn->atomic_set_ver_value(label_key, label_val);
    LOG(INFO) << "txn->atomic_set_ver_value label_key=" << hex(label_key) << " label=" << label
              << " txn_id=" << txn_id;

    txn->put(info_key, info_val);
    txn->put(index_key, index_val);
    txn->put(running_key, running_val);
    LOG(INFO) << "xxx put info_key=" << hex(info_key) << " txn_id=" << txn_id;
    LOG(INFO) << "xxx put running_key=" << hex(running_key) << " txn_id=" << txn_id;
    LOG(INFO) << "xxx put index_key=" << hex(index_key) << " txn_id=" << txn_id;

    err = txn->commit();
    if (err != TxnErrorCode::TXN_OK) {
        code = cast_as<ErrCategory::COMMIT>(err);
        ss << "failed to commit txn kv, label=" << label << " txn_id=" << txn_id << " err=" << err;
        msg = ss.str();
        return;
    }
    TEST_SYNC_POINT_CALLBACK("begin_txn:after:commit_txn:2", &txn_id);
    response->set_txn_id(txn_id);
}

void MetaServiceImpl::precommit_txn(::google::protobuf::RpcController* controller,
                                    const PrecommitTxnRequest* request,
                                    PrecommitTxnResponse* response,
                                    ::google::protobuf::Closure* done) {
    RPC_PREPROCESS(precommit_txn, get, put);
    int64_t txn_id = request->has_txn_id() ? request->txn_id() : -1;
    int64_t db_id = request->has_db_id() ? request->db_id() : -1;
    if ((txn_id < 0 && db_id < 0)) {
        code = MetaServiceCode::INVALID_ARGUMENT;
        ss << "invalid argument, "
           << "txn_id=" << txn_id << " db_id=" << db_id;
        msg = ss.str();
        return;
    }

    std::string cloud_unique_id = request->has_cloud_unique_id() ? request->cloud_unique_id() : "";
    instance_id = get_instance_id(resource_mgr_, cloud_unique_id);
    if (instance_id.empty()) {
        code = MetaServiceCode::INVALID_ARGUMENT;
        ss << "cannot find instance_id with cloud_unique_id="
           << (cloud_unique_id.empty() ? "(empty)" : cloud_unique_id) << " txn_id=" << txn_id;
        msg = ss.str();
        return;
    }
    RPC_RATE_LIMIT(precommit_txn);
    TxnErrorCode err = txn_kv_->create_txn(&txn);
    if (err != TxnErrorCode::TXN_OK) {
        code = cast_as<ErrCategory::CREATE>(err);
        ss << "txn_kv_->create_txn() failed, err=" << err << " txn_id=" << txn_id;
        msg = ss.str();
        return;
    }

    //not provide db_id, we need read from disk.
    if (db_id < 0) {
        const std::string index_key = txn_index_key({instance_id, txn_id});
        std::string index_val;
        err = txn->get(index_key, &index_val);
        if (err != TxnErrorCode::TXN_OK) {
            code = err == TxnErrorCode::TXN_KEY_NOT_FOUND ? MetaServiceCode::TXN_ID_NOT_FOUND
                                                          : cast_as<ErrCategory::READ>(err);
            ss << "failed to get db id with txn_id=" << txn_id << " err=" << err;
            msg = ss.str();
            return;
        }
        TxnIndexPB index_pb;
        if (!index_pb.ParseFromString(index_val)) {
            code = MetaServiceCode::PROTOBUF_PARSE_ERR;
            ss << "failed to parse txn_info"
               << " txn_id=" << txn_id;
            msg = ss.str();
            return;
        }
        DCHECK(index_pb.has_tablet_index() == true);
        DCHECK(index_pb.tablet_index().has_db_id() == true);
        db_id = index_pb.tablet_index().db_id();
        VLOG_DEBUG << " find db_id=" << db_id << " from index";
    } else {
        db_id = request->db_id();
    }

    // Get txn info with db_id and txn_id
    const std::string info_key = txn_info_key({instance_id, db_id, txn_id});
    std::string info_val; // Will be reused when saving updated txn
    err = txn->get(info_key, &info_val);
    if (err != TxnErrorCode::TXN_OK) {
        code = err == TxnErrorCode::TXN_KEY_NOT_FOUND ? MetaServiceCode::TXN_ID_NOT_FOUND
                                                      : cast_as<ErrCategory::READ>(err);
        ss << "failed to get db id with db_id=" << db_id << " txn_id=" << txn_id << " err=" << err;
        msg = ss.str();
        return;
    }

    TxnInfoPB txn_info;
    if (!txn_info.ParseFromString(info_val)) {
        code = MetaServiceCode::PROTOBUF_PARSE_ERR;
        ss << "failed to parse txn_inf db_id=" << db_id << " txn_id=" << txn_id;
        msg = ss.str();
        return;
    }

    DCHECK(txn_info.txn_id() == txn_id);
    if (txn_info.status() == TxnStatusPB::TXN_STATUS_ABORTED) {
        code = MetaServiceCode::TXN_ALREADY_ABORTED;
        ss << "transaction is already aborted: db_id=" << db_id << " txn_id=" << txn_id;
        msg = ss.str();
        return;
    }

    if (txn_info.status() == TxnStatusPB::TXN_STATUS_VISIBLE) {
        code = MetaServiceCode::TXN_ALREADY_VISIBLE;
        ss << "transaction is already visible: db_id=" << db_id << " txn_id=" << txn_id;
        msg = ss.str();
    }

    if (txn_info.status() == TxnStatusPB::TXN_STATUS_PRECOMMITTED) {
        code = MetaServiceCode::TXN_ALREADY_PRECOMMITED;
        ss << "transaction is already precommited: db_id=" << db_id << " txn_id=" << txn_id;
        msg = ss.str();
    }

    LOG(INFO) << "before update txn_info=" << txn_info.ShortDebugString();

    // Update txn_info
    txn_info.set_status(TxnStatusPB::TXN_STATUS_PRECOMMITTED);

    auto now_time = system_clock::now();
    uint64_t precommit_time = duration_cast<milliseconds>(now_time.time_since_epoch()).count();
    txn_info.set_precommit_time(precommit_time);
    if (request->has_commit_attachment()) {
        txn_info.mutable_commit_attachment()->CopyFrom(request->commit_attachment());
    }
    LOG(INFO) << "after update txn_info=" << txn_info.ShortDebugString();

    info_val.clear();
    if (!txn_info.SerializeToString(&info_val)) {
        code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
        ss << "failed to serialize txn_info when saving, txn_id=" << txn_id;
        msg = ss.str();
        return;
    }

    txn->put(info_key, info_val);
    LOG(INFO) << "xxx put info_key=" << hex(info_key) << " txn_id=" << txn_id;

    const std::string running_key = txn_running_key({instance_id, db_id, txn_id});
    std::string running_val;

    TxnRunningPB running_pb;
    running_pb.set_timeout_time(precommit_time + txn_info.precommit_timeout_ms());
    running_pb.mutable_table_ids()->CopyFrom(txn_info.table_ids());
    if (!running_pb.SerializeToString(&running_val)) {
        code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
        ss << "failed to serialize running_pb, txn_id=" << txn_id;
        msg = ss.str();
        return;
    }

    txn->put(running_key, running_val);
    LOG(INFO) << "xxx put running_key=" << hex(running_key) << " txn_id=" << txn_id;

    err = txn->commit();
    if (err != TxnErrorCode::TXN_OK) {
        code = cast_as<ErrCategory::COMMIT>(err);
        ss << "failed to commit txn kv, txn_id=" << txn_id << " err=" << err;
        msg = ss.str();
        return;
    }
}

void put_routine_load_progress(MetaServiceCode& code, std::string& msg,
                               const std::string& instance_id, const CommitTxnRequest* request,
                               Transaction* txn, int64_t db_id) {
    std::stringstream ss;
    int64_t txn_id = request->txn_id();
    if (!request->has_commit_attachment()) {
        ss << "failed to get commit attachment from req, db_id=" << db_id << " txn_id=" << txn_id;
        msg = ss.str();
        return;
    }

    TxnCommitAttachmentPB txn_commit_attachment = request->commit_attachment();
    RLTaskTxnCommitAttachmentPB commit_attachment =
            txn_commit_attachment.rl_task_txn_commit_attachment();
    int64_t job_id = commit_attachment.job_id();

    std::string rl_progress_key;
    std::string rl_progress_val;
    bool prev_progress_existed = true;
    RLJobProgressKeyInfo rl_progress_key_info {instance_id, db_id, job_id};
    rl_job_progress_key_info(rl_progress_key_info, &rl_progress_key);
    TxnErrorCode err = txn->get(rl_progress_key, &rl_progress_val);
    if (err != TxnErrorCode::TXN_OK) {
        if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) {
            prev_progress_existed = false;
        } else {
            code = cast_as<ErrCategory::READ>(err);
            ss << "failed to get routine load progress, db_id=" << db_id << " txn_id=" << txn_id
               << " err=" << err;
            msg = ss.str();
            return;
        }
    }

    RoutineLoadProgressPB prev_progress_info;
    if (prev_progress_existed) {
        if (!prev_progress_info.ParseFromString(rl_progress_val)) {
            code = MetaServiceCode::PROTOBUF_PARSE_ERR;
            ss << "failed to parse routine load progress, db_id=" << db_id << " txn_id=" << txn_id;
            msg = ss.str();
            return;
        }
    }

    std::string new_progress_val;
    RoutineLoadProgressPB new_progress_info;
    new_progress_info.CopyFrom(commit_attachment.progress());
    for (auto const& elem : prev_progress_info.partition_to_offset()) {
        auto it = new_progress_info.partition_to_offset().find(elem.first);
        if (it == new_progress_info.partition_to_offset().end()) {
            new_progress_info.mutable_partition_to_offset()->insert(elem);
        }
    }

    std::string new_statistic_val;
    RoutineLoadJobStatisticPB* new_statistic_info = new_progress_info.mutable_stat();
    if (prev_progress_info.has_stat()) {
        const RoutineLoadJobStatisticPB& prev_statistic_info = prev_progress_info.stat();

        new_statistic_info->set_filtered_rows(prev_statistic_info.filtered_rows() +
                                              commit_attachment.filtered_rows());
        new_statistic_info->set_loaded_rows(prev_statistic_info.loaded_rows() +
                                            commit_attachment.loaded_rows());
        new_statistic_info->set_unselected_rows(prev_statistic_info.unselected_rows() +
                                                commit_attachment.unselected_rows());
        new_statistic_info->set_received_bytes(prev_statistic_info.received_bytes() +
                                               commit_attachment.received_bytes());
        new_statistic_info->set_task_execution_time_ms(
                prev_statistic_info.task_execution_time_ms() +
                commit_attachment.task_execution_time_ms());
    } else {
        new_statistic_info->set_filtered_rows(commit_attachment.filtered_rows());
        new_statistic_info->set_loaded_rows(commit_attachment.loaded_rows());
        new_statistic_info->set_unselected_rows(commit_attachment.unselected_rows());
        new_statistic_info->set_received_bytes(commit_attachment.received_bytes());
        new_statistic_info->set_task_execution_time_ms(commit_attachment.task_execution_time_ms());
    }

    if (!new_progress_info.SerializeToString(&new_progress_val)) {
        code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
        ss << "failed to serialize new progress val, txn_id=" << txn_id;
        msg = ss.str();
        return;
    }

    txn->put(rl_progress_key, new_progress_val);
    LOG(INFO) << "put rl_progress_key key=" << hex(rl_progress_key)
              << " routine load new progress: " << new_progress_info.ShortDebugString();
}

void update_streaming_job_meta(MetaServiceCode& code, std::string& msg,
                               const std::string& instance_id, const CommitTxnRequest* request,
                               Transaction* txn, int64_t db_id) {
    std::stringstream ss;
    int64_t txn_id = request->txn_id();
    if (!request->has_commit_attachment()) {
        code = MetaServiceCode::INVALID_ARGUMENT;
        ss << "missing commit attachment, db_id=" << db_id << " txn_id=" << txn_id;
        msg = ss.str();
        return;
    }
    TxnCommitAttachmentPB txn_commit_attachment = request->commit_attachment();
    StreamingTaskCommitAttachmentPB commit_attachment =
            txn_commit_attachment.streaming_task_txn_commit_attachment();
    int64_t job_id = commit_attachment.job_id();

    std::string streaming_job_val;
    bool prev_existed = true;
    std::string streaming_job_key_str = streaming_job_key({instance_id, db_id, job_id});
    TxnErrorCode err = txn->get(streaming_job_key_str, &streaming_job_val);
    if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) {
        prev_existed = false;
    } else if (err != TxnErrorCode::TXN_OK) {
        code = cast_as<ErrCategory::READ>(err);
        ss << "failed to get streaming job, db_id=" << db_id << " txn_id=" << txn_id
           << " err=" << err;
        msg = ss.str();
        return;
    }

    StreamingTaskCommitAttachmentPB new_job_info;
    if (prev_existed) {
        if (!new_job_info.ParseFromString(streaming_job_val)) {
            code = MetaServiceCode::PROTOBUF_PARSE_ERR;
            ss << "failed to parse streaming job meta, db_id=" << db_id << " txn_id=" << txn_id;
            msg = ss.str();
            return;
        }
        new_job_info.set_scanned_rows(new_job_info.scanned_rows() +
                                      commit_attachment.scanned_rows());
        new_job_info.set_load_bytes(new_job_info.load_bytes() + commit_attachment.load_bytes());
        new_job_info.set_num_files(new_job_info.num_files() + commit_attachment.num_files());
        new_job_info.set_file_bytes(new_job_info.file_bytes() + commit_attachment.file_bytes());
    } else {
        new_job_info.set_job_id(commit_attachment.job_id());
        new_job_info.set_scanned_rows(commit_attachment.scanned_rows());
        new_job_info.set_load_bytes(commit_attachment.load_bytes());
        new_job_info.set_num_files(commit_attachment.num_files());
        new_job_info.set_file_bytes(commit_attachment.file_bytes());
    }
    if (commit_attachment.has_offset()) {
        new_job_info.set_offset(commit_attachment.offset());
    }
    std::string new_job_val;
    if (!new_job_info.SerializeToString(&new_job_val)) {
        code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
        ss << "failed to serialize new streaming job val, txn_id=" << txn_id;
        msg = ss.str();
        return;
    }

    txn->put(streaming_job_key_str, new_job_val);
    LOG(INFO) << "put streaming_job_key key=" << hex(streaming_job_key_str)
              << " streaming job new meta: " << new_job_info.ShortDebugString();
}

void MetaServiceImpl::get_rl_task_commit_attach(::google::protobuf::RpcController* controller,
                                                const GetRLTaskCommitAttachRequest* request,
                                                GetRLTaskCommitAttachResponse* response,
                                                ::google::protobuf::Closure* done) {
    RPC_PREPROCESS(get_rl_task_commit_attach, get);
    instance_id = get_instance_id(resource_mgr_, request->cloud_unique_id());
    if (instance_id.empty()) {
        code = MetaServiceCode::INVALID_ARGUMENT;
        msg = "empty instance_id";
        LOG(INFO) << msg << ", cloud_unique_id=" << request->cloud_unique_id();
        return;
    }
    RPC_RATE_LIMIT(get_rl_task_commit_attach)

    TxnErrorCode err = txn_kv_->create_txn(&txn);
    if (err != TxnErrorCode::TXN_OK) {
        code = cast_as<ErrCategory::CREATE>(err);
        ss << "filed to create txn, err=" << err;
        msg = ss.str();
        return;
    }

    if (!request->has_db_id() || !request->has_job_id()) {
        code = MetaServiceCode::INVALID_ARGUMENT;
        msg = "empty db_id or job_id";
        LOG(INFO) << msg << ", cloud_unique_id=" << request->cloud_unique_id();
        return;
    }

    int64_t db_id = request->db_id();
    int64_t job_id = request->job_id();
    std::string rl_progress_key;
    std::string rl_progress_val;
    RLJobProgressKeyInfo rl_progress_key_info {instance_id, db_id, job_id};
    rl_job_progress_key_info(rl_progress_key_info, &rl_progress_key);
    err = txn->get(rl_progress_key, &rl_progress_val);
    if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) {
        code = MetaServiceCode::ROUTINE_LOAD_PROGRESS_NOT_FOUND;
        ss << "progress info not found, db_id=" << db_id << " job_id=" << job_id << " err=" << err;
        msg = ss.str();
        return;
    } else if (err != TxnErrorCode::TXN_OK) {
        code = cast_as<ErrCategory::READ>(err);
        ss << "failed to get progress info, db_id=" << db_id << " job_id=" << job_id
           << " err=" << err;
        msg = ss.str();
        return;
    }

    RLTaskTxnCommitAttachmentPB* commit_attach = response->mutable_commit_attach();
    RoutineLoadProgressPB* progress_info = commit_attach->mutable_progress();
    if (!progress_info->ParseFromString(rl_progress_val)) {
        code = MetaServiceCode::PROTOBUF_PARSE_ERR;
        ss << "failed to parse progress info, db_id=" << db_id << " job_id=" << job_id;
        msg = ss.str();
        return;
    }

    if (progress_info->has_stat()) {
        const RoutineLoadJobStatisticPB& statistic_info = progress_info->stat();
        commit_attach->set_filtered_rows(statistic_info.filtered_rows());
        commit_attach->set_loaded_rows(statistic_info.loaded_rows());
        commit_attach->set_unselected_rows(statistic_info.unselected_rows());
        commit_attach->set_received_bytes(statistic_info.received_bytes());
        commit_attach->set_task_execution_time_ms(statistic_info.task_execution_time_ms());
    }
}

void MetaServiceImpl::get_streaming_task_commit_attach(
        ::google::protobuf::RpcController* controller,
        const GetStreamingTaskCommitAttachRequest* request,
        GetStreamingTaskCommitAttachResponse* response, ::google::protobuf::Closure* done) {
    RPC_PREPROCESS(get_streaming_task_commit_attach, get);
    instance_id = get_instance_id(resource_mgr_, request->cloud_unique_id());
    if (instance_id.empty()) {
        code = MetaServiceCode::INVALID_ARGUMENT;
        msg = "empty instance_id";
        LOG(INFO) << msg << ", cloud_unique_id=" << request->cloud_unique_id();
        return;
    }
    RPC_RATE_LIMIT(get_streaming_task_commit_attach)

    TxnErrorCode err = txn_kv_->create_txn(&txn);
    if (err != TxnErrorCode::TXN_OK) {
        code = cast_as<ErrCategory::CREATE>(err);
        ss << "filed to create txn, err=" << err;
        msg = ss.str();
        return;
    }

    if (!request->has_db_id() || !request->has_job_id()) {
        code = MetaServiceCode::INVALID_ARGUMENT;
        msg = "empty db_id or job_id";
        LOG(INFO) << msg << ", cloud_unique_id=" << request->cloud_unique_id();
        return;
    }

    int64_t db_id = request->db_id();
    int64_t job_id = request->job_id();
    std::string streaming_job_val;
    std::string streaming_job_key_str = streaming_job_key({instance_id, db_id, job_id});
    err = txn->get(streaming_job_key_str, &streaming_job_val);
    if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) {
        code = MetaServiceCode::STREAMING_JOB_PROGRESS_NOT_FOUND;
        ss << "progress info not found, db_id=" << db_id << " job_id=" << job_id << " err=" << err;
        msg = ss.str();
        return;
    } else if (err != TxnErrorCode::TXN_OK) {
        code = cast_as<ErrCategory::READ>(err);
        ss << "failed to get progress info, db_id=" << db_id << " job_id=" << job_id
           << " err=" << err;
        msg = ss.str();
        return;
    }

    StreamingTaskCommitAttachmentPB* commit_attach = response->mutable_commit_attach();
    if (!commit_attach->ParseFromString(streaming_job_val)) {
        code = MetaServiceCode::PROTOBUF_PARSE_ERR;
        ss << "failed to parse meta info, db_id=" << db_id << " job_id=" << job_id;
        msg = ss.str();
        return;
    }
}

void MetaServiceImpl::reset_rl_progress(::google::protobuf::RpcController* controller,
                                        const ResetRLProgressRequest* request,
                                        ResetRLProgressResponse* response,
                                        ::google::protobuf::Closure* done) {
    RPC_PREPROCESS(reset_rl_progress, get, put, del);
    instance_id = get_instance_id(resource_mgr_, request->cloud_unique_id());
    if (instance_id.empty()) {
        code = MetaServiceCode::INVALID_ARGUMENT;
        msg = "empty instance_id";
        LOG(INFO) << msg << ", cloud_unique_id=" << request->cloud_unique_id();
        return;
    }
    RPC_RATE_LIMIT(reset_rl_progress)

    TxnErrorCode err = txn_kv_->create_txn(&txn);
    if (err != TxnErrorCode::TXN_OK) {
        code = cast_as<ErrCategory::CREATE>(err);
        ss << "filed to create txn, err=" << err;
        msg = ss.str();
        return;
    }

    if (!request->has_db_id() || !request->has_job_id()) {
        code = MetaServiceCode::INVALID_ARGUMENT;
        msg = "empty db_id or job_id";
        LOG(INFO) << msg << ", cloud_unique_id=" << request->cloud_unique_id();
        return;
    }

    int64_t db_id = request->db_id();
    int64_t job_id = request->job_id();
    std::string rl_progress_key;
    std::string rl_progress_val;
    RLJobProgressKeyInfo rl_progress_key_info {instance_id, db_id, job_id};
    rl_job_progress_key_info(rl_progress_key_info, &rl_progress_key);

    if (request->partition_to_offset().size() == 0) {
        txn->remove(rl_progress_key);
        LOG(INFO) << "remove rl_progress_key key=" << hex(rl_progress_key);
    }

    if (request->partition_to_offset().size() > 0) {
        bool prev_progress_existed = true;
        RoutineLoadProgressPB prev_progress_info;
        TxnErrorCode err = txn->get(rl_progress_key, &rl_progress_val);
        if (err != TxnErrorCode::TXN_OK) {
            if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) {
                prev_progress_existed = false;
            } else {
                code = cast_as<ErrCategory::READ>(err);
                ss << "failed to get routine load progress, db_id=" << db_id << "job_id=" << job_id
                   << " err=" << err;
                msg = ss.str();
                return;
            }
        }
        if (prev_progress_existed) {
            if (!prev_progress_info.ParseFromString(rl_progress_val)) {
                code = MetaServiceCode::PROTOBUF_PARSE_ERR;
                ss << "failed to parse routine load progress, db_id=" << db_id
                   << "job_id=" << job_id;
                msg = ss.str();
                return;
            }
        }

        std::string new_progress_val;
        RoutineLoadProgressPB new_progress_info;
        for (auto const& elem : request->partition_to_offset()) {
            new_progress_info.mutable_partition_to_offset()->insert(elem);
        }
        if (request->partition_to_offset().size() > 0) {
            for (auto const& elem : prev_progress_info.partition_to_offset()) {
                auto it = new_progress_info.partition_to_offset().find(elem.first);
                if (it == new_progress_info.partition_to_offset().end()) {
                    new_progress_info.mutable_partition_to_offset()->insert(elem);
                }
            }
        }

        if (!new_progress_info.SerializeToString(&new_progress_val)) {
            code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
            ss << "failed to serialize new progress val"
               << "db_id=" << db_id << "job_id=" << job_id;
            msg = ss.str();
            return;
        }
        txn->put(rl_progress_key, new_progress_val);
        LOG(INFO) << "put rl_progress_key key=" << hex(rl_progress_key);
    }

    err = txn->commit();
    if (err != TxnErrorCode::TXN_OK) {
        code = cast_as<ErrCategory::READ>(err);
        ss << "failed to commit progress info, db_id=" << db_id << " job_id=" << job_id
           << " err=" << err;
        msg = ss.str();
        return;
    }
}

void MetaServiceImpl::reset_streaming_job_offset(::google::protobuf::RpcController* controller,
                                                 const ResetStreamingJobOffsetRequest* request,
                                                 ResetStreamingJobOffsetResponse* response,
                                                 ::google::protobuf::Closure* done) {
    RPC_PREPROCESS(reset_streaming_job_offset, get, put, del);
    instance_id = get_instance_id(resource_mgr_, request->cloud_unique_id());
    if (instance_id.empty()) {
        code = MetaServiceCode::INVALID_ARGUMENT;
        msg = "empty instance_id";
        LOG(INFO) << msg << ", cloud_unique_id=" << request->cloud_unique_id();
        return;
    }
    RPC_RATE_LIMIT(reset_streaming_job_offset)

    TxnErrorCode err = txn_kv_->create_txn(&txn);
    if (err != TxnErrorCode::TXN_OK) {
        code = cast_as<ErrCategory::CREATE>(err);
        ss << "failed to create txn, err=" << err;
        msg = ss.str();
        return;
    }

    if (!request->has_db_id() || !request->has_job_id()) {
        code = MetaServiceCode::INVALID_ARGUMENT;
        msg = "empty db_id or job_id";
        LOG(INFO) << msg << ", cloud_unique_id=" << request->cloud_unique_id();
        return;
    }

    int64_t db_id = request->db_id();
    int64_t job_id = request->job_id();
    std::string streaming_job_key_str = streaming_job_key({instance_id, db_id, job_id});
    std::string streaming_job_val;

    // If no offset provided, remove the streaming job progress
    if (!request->has_offset()) {
        txn->remove(streaming_job_key_str);
        LOG(INFO) << "remove streaming_job_key key=" << hex(streaming_job_key_str);
    } else {
        // If offset is provided, update the streaming job progress
        bool prev_existed = true;
        StreamingTaskCommitAttachmentPB prev_job_info;
        err = txn->get(streaming_job_key_str, &streaming_job_val);
        if (err != TxnErrorCode::TXN_OK) {
            if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) {
                prev_existed = false;
            } else {
                code = cast_as<ErrCategory::READ>(err);
                ss << "failed to get streaming job progress, db_id=" << db_id
                   << " job_id=" << job_id << " err=" << err;
                msg = ss.str();
                return;
            }
        }

        if (prev_existed) {
            if (!prev_job_info.ParseFromString(streaming_job_val)) {
                code = MetaServiceCode::PROTOBUF_PARSE_ERR;
                ss << "failed to parse streaming job offset, db_id=" << db_id
                   << " job_id=" << job_id;
                msg = ss.str();
                return;
            }
        }

        std::string new_job_val;
        StreamingTaskCommitAttachmentPB new_job_info;

        // Set the new offset
        new_job_info.set_offset(request->offset());
        new_job_info.set_job_id(job_id);

        // Preserve existing statistics if they exist
        if (prev_existed) {
            new_job_info.set_scanned_rows(prev_job_info.scanned_rows());
            new_job_info.set_load_bytes(prev_job_info.load_bytes());
            new_job_info.set_num_files(prev_job_info.num_files());
            new_job_info.set_file_bytes(prev_job_info.file_bytes());
        }

        if (!new_job_info.SerializeToString(&new_job_val)) {
            code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
            ss << "failed to serialize new streaming job val, db_id=" << db_id
               << " job_id=" << job_id;
            msg = ss.str();
            return;
        }

        txn->put(streaming_job_key_str, new_job_val);
        LOG(INFO) << "reset offset, put streaming_job_key key=" << hex(streaming_job_key_str)
                  << " prev job val: " << prev_job_info.ShortDebugString()
                  << " new job val: " << new_job_info.ShortDebugString();
    }

    err = txn->commit();
    if (err != TxnErrorCode::TXN_OK) {
        code = cast_as<ErrCategory::COMMIT>(err);
        ss << "failed to commit streaming job offset, db_id=" << db_id << " job_id=" << job_id
           << " err=" << err;
        msg = ss.str();
        return;
    }
}

void MetaServiceImpl::delete_streaming_job(::google::protobuf::RpcController* controller,
                                           const DeleteStreamingJobRequest* request,
                                           DeleteStreamingJobResponse* response,
                                           ::google::protobuf::Closure* done) {
    RPC_PREPROCESS(delete_streaming_job, del);
    instance_id = get_instance_id(resource_mgr_, request->cloud_unique_id());
    if (instance_id.empty()) {
        code = MetaServiceCode::INVALID_ARGUMENT;
        msg = "empty instance_id";
        LOG(INFO) << msg << ", cloud_unique_id=" << request->cloud_unique_id();
        return;
    }
    RPC_RATE_LIMIT(delete_streaming_job)

    TxnErrorCode err = txn_kv_->create_txn(&txn);
    if (err != TxnErrorCode::TXN_OK) {
        code = cast_as<ErrCategory::CREATE>(err);
        ss << "filed to create txn, err=" << err;
        msg = ss.str();
        return;
    }

    if (!request->has_db_id() || !request->has_job_id()) {
        code = MetaServiceCode::INVALID_ARGUMENT;
        msg = "missing db_id or job_id";
        LOG(INFO) << msg << ", cloud_unique_id=" << request->cloud_unique_id();
        return;
    }
    int64_t db_id = request->db_id();
    int64_t job_id = request->job_id();
    std::string key_to_delete = streaming_job_key({instance_id, db_id, job_id});

    txn->remove(key_to_delete);
    LOG(INFO) << "remove key=" << hex(key_to_delete);

    err = txn->commit();
    if (err != TxnErrorCode::TXN_OK) {
        code = cast_as<ErrCategory::READ>(err);
        ss << "failed to commit delete, err=" << err;
        msg = ss.str();
        return;
    }
}

void get_txn_db_id(TxnKv* txn_kv, const std::string& instance_id, int64_t txn_id,
                   MetaServiceCode& code, std::string& msg, int64_t* db_id, KVStats* stats) {
    std::stringstream ss;
    std::unique_ptr<Transaction> txn;
    TxnErrorCode err = txn_kv->create_txn(&txn);
    if (err != TxnErrorCode::TXN_OK) {
        code = cast_as<ErrCategory::CREATE>(err);
        ss << "failed to create txn, txn_id=" << txn_id << " err=" << err;
        msg = ss.str();
        LOG(WARNING) << msg;
        return;
    }

    DORIS_CLOUD_DEFER {
        if (!stats || !txn) return;
        stats->get_bytes += txn->get_bytes();
        stats->get_counter += txn->num_get_keys();
    };

    // Get db id with txn id
    std::string index_val;
    const std::string index_key = txn_index_key({instance_id, txn_id});
    err = txn->get(index_key, &index_val);
    if (err != TxnErrorCode::TXN_OK) {
        code = cast_as<ErrCategory::READ>(err);
        ss << "failed to get db id, txn_id=" << txn_id << " err=" << err;
        msg = ss.str();
        LOG(WARNING) << msg;
        return;
    }

    TxnIndexPB index_pb;
    if (!index_pb.ParseFromString(index_val)) {
        code = MetaServiceCode::PROTOBUF_PARSE_ERR;
        ss << "failed to parse txn_index_pb, txn_id=" << txn_id;
        msg = ss.str();
        LOG(WARNING) << msg;
        return;
    }

    DCHECK(index_pb.has_tablet_index() == true);
    DCHECK(index_pb.tablet_index().has_db_id() == true);
    *db_id = index_pb.tablet_index().db_id();
}

void scan_tmp_rowset(
        const std::string& instance_id, int64_t txn_id, std::shared_ptr<TxnKv> txn_kv,
        MetaServiceCode& code, std::string& msg,
        std::vector<std::pair<std::string, doris::RowsetMetaCloudPB>>* tmp_rowsets_meta,
        KVStats* stats) {
    // Create a readonly txn for scan tmp rowset
    std::stringstream ss;
    std::unique_ptr<Transaction> txn;
    TxnErrorCode err = txn_kv->create_txn(&txn);
    if (err != TxnErrorCode::TXN_OK) {
        code = cast_as<ErrCategory::CREATE>(err);
        ss << "failed to create txn, txn_id=" << txn_id << " err=" << err;
        msg = ss.str();
        LOG(WARNING) << msg;
        return;
    }
    DORIS_CLOUD_DEFER {
        if (!stats || !txn) return;
        stats->get_bytes += txn->get_bytes();
        stats->get_counter += txn->num_get_keys();
    };

    // Get temporary rowsets involved in the txn
    // This is a range scan
    MetaRowsetTmpKeyInfo rs_tmp_key_info0 {instance_id, txn_id, 0};
    MetaRowsetTmpKeyInfo rs_tmp_key_info1 {instance_id, txn_id + 1, 0};
    std::string rs_tmp_key0;
    std::string rs_tmp_key1;
    meta_rowset_tmp_key(rs_tmp_key_info0, &rs_tmp_key0);
    meta_rowset_tmp_key(rs_tmp_key_info1, &rs_tmp_key1);

    int num_rowsets = 0;
    DORIS_CLOUD_DEFER_COPY(rs_tmp_key0, rs_tmp_key1) {
        LOG(INFO) << "get tmp rowset meta, txn_id=" << txn_id << " num_rowsets=" << num_rowsets
                  << " range=[" << hex(rs_tmp_key0) << "," << hex(rs_tmp_key1) << ")";
    };

    std::unique_ptr<RangeGetIterator> it;
    do {
        err = txn->get(rs_tmp_key0, rs_tmp_key1, &it, true);
        if (err == TxnErrorCode::TXN_TOO_OLD) {
            err = txn_kv->create_txn(&txn);
            if (err == TxnErrorCode::TXN_OK) {
                err = txn->get(rs_tmp_key0, rs_tmp_key1, &it, true);
            }
        }
        if (err != TxnErrorCode::TXN_OK) {
            code = cast_as<ErrCategory::READ>(err);
            ss << "internal error, failed to get tmp rowset while committing, txn_id=" << txn_id
               << " err=" << err;
            msg = ss.str();
            LOG(WARNING) << msg;
            return;
        }

        while (it->has_next()) {
            auto [k, v] = it->next();
            LOG(INFO) << "range_get rowset_tmp_key=" << hex(k) << " txn_id=" << txn_id;
            tmp_rowsets_meta->emplace_back();
            if (!tmp_rowsets_meta->back().second.ParseFromArray(v.data(), v.size())) {
                code = MetaServiceCode::PROTOBUF_PARSE_ERR;
                ss << "malformed rowset meta, unable to initialize, txn_id=" << txn_id
                   << " key=" << hex(k);
                msg = ss.str();
                LOG(WARNING) << msg;
                return;
            }
            // Save keys that will be removed later
            tmp_rowsets_meta->back().first = std::string(k.data(), k.size());
            ++num_rowsets;
            if (!it->has_next()) rs_tmp_key0 = k;
        }
        rs_tmp_key0.push_back('\x00'); // Update to next smallest key for iteration
    } while (it->more());

    VLOG_DEBUG << "txn_id=" << txn_id << " tmp_rowsets_meta.size()=" << tmp_rowsets_meta->size();
    return;
}

void update_tablet_stats(const StatsTabletKeyInfo& info, const TabletStats& stats,
                         std::unique_ptr<Transaction>& txn, MetaServiceCode& code,
                         std::string& msg) {
    if (config::split_tablet_stats) {
        if (stats.num_segs > 0) {
            std::string data_size_key;
            stats_tablet_data_size_key(info, &data_size_key);
            txn->atomic_add(data_size_key, stats.data_size);
            std::string num_rows_key;
            stats_tablet_num_rows_key(info, &num_rows_key);
            txn->atomic_add(num_rows_key, stats.num_rows);
            std::string num_segs_key;
            stats_tablet_num_segs_key(info, &num_segs_key);
            txn->atomic_add(num_segs_key, stats.num_segs);
            std::string index_size_key;
            stats_tablet_index_size_key(info, &index_size_key);
            txn->atomic_add(index_size_key, stats.index_size);
            std::string segment_size_key;
            stats_tablet_segment_size_key(info, &segment_size_key);
            txn->atomic_add(segment_size_key, stats.segment_size);
        }
        std::string num_rowsets_key;
        stats_tablet_num_rowsets_key(info, &num_rowsets_key);
        txn->atomic_add(num_rowsets_key, stats.num_rowsets);
    } else {
        std::string key;
        stats_tablet_key(info, &key);
        std::string val;
        TxnErrorCode err = txn->get(key, &val);
        if (err != TxnErrorCode::TXN_OK) {
            code = err == TxnErrorCode::TXN_KEY_NOT_FOUND ? MetaServiceCode::TABLET_NOT_FOUND
                                                          : cast_as<ErrCategory::READ>(err);
            msg = fmt::format("failed to get tablet stats, err={} tablet_id={}", err,
                              std::get<4>(info));
            return;
        }
        TabletStatsPB stats_pb;
        if (!stats_pb.ParseFromString(val)) {
            code = MetaServiceCode::PROTOBUF_PARSE_ERR;
            msg = fmt::format("malformed tablet stats value, key={}", hex(key));
            return;
        }
        stats_pb.set_data_size(stats_pb.data_size() + stats.data_size);
        stats_pb.set_num_rows(stats_pb.num_rows() + stats.num_rows);
        stats_pb.set_num_rowsets(stats_pb.num_rowsets() + stats.num_rowsets);
        stats_pb.set_num_segments(stats_pb.num_segments() + stats.num_segs);
        stats_pb.set_index_size(stats_pb.index_size() + stats.index_size);
        stats_pb.set_segment_size(stats_pb.segment_size() + stats.segment_size);
        stats_pb.SerializeToString(&val);
        txn->put(key, val);
        LOG(INFO) << "put stats_tablet_key key=" << hex(key);
    }
}

// process mow table, check lock and remove pending key
void process_mow_when_commit_txn(
        const CommitTxnRequest* request, const std::string& instance_id, MetaServiceCode& code,
        std::string& msg, std::unique_ptr<Transaction>& txn,
        std::unordered_map<int64_t, std::vector<int64_t>>& table_id_tablet_ids) {
    int64_t txn_id = request->txn_id();
    std::stringstream ss;
    std::vector<std::string> lock_keys;
    lock_keys.reserve(request->mow_table_ids().size());
    for (auto table_id : request->mow_table_ids()) {
        lock_keys.push_back(meta_delete_bitmap_update_lock_key({instance_id, table_id, -1}));
    }
    std::vector<std::optional<std::string>> lock_values;
    TxnErrorCode err = txn->batch_get(&lock_values, lock_keys);
    if (err != TxnErrorCode::TXN_OK) {
        ss << "failed to get delete bitmap update lock key info, instance_id=" << instance_id
           << " err=" << err;
        msg = ss.str();
        code = cast_as<ErrCategory::READ>(err);
        LOG(WARNING) << msg << " txn_id=" << txn_id;
        return;
    }
    size_t total_locks = lock_keys.size();
    for (size_t i = 0; i < total_locks; i++) {
        int64_t table_id = request->mow_table_ids(i);
        // When the key does not exist, it means the lock has been acquired
        // by another transaction and successfully committed.
        if (!lock_values[i].has_value()) {
            ss << "get delete bitmap update lock info, lock is expired"
               << " table_id=" << table_id << " key=" << hex(lock_keys[i]) << " txn_id=" << txn_id;
            code = MetaServiceCode::LOCK_EXPIRED;
            msg = ss.str();
            LOG(WARNING) << msg << " txn_id=" << txn_id;
            return;
        }

        DeleteBitmapUpdateLockPB lock_info;
        if (!lock_info.ParseFromString(lock_values[i].value())) [[unlikely]] {
            code = MetaServiceCode::PROTOBUF_PARSE_ERR;
            msg = "failed to parse DeleteBitmapUpdateLockPB";
            LOG(WARNING) << msg << " txn_id=" << txn_id;
            return;
        }
        if (lock_info.lock_id() != request->txn_id()) {
            ss << "lock is expired, locked by lock_id=" << lock_info.lock_id();
            msg = ss.str();
            code = MetaServiceCode::LOCK_EXPIRED;
            return;
        }
        txn->remove(lock_keys[i]);
        LOG(INFO) << "xxx remove delete bitmap lock, lock_key=" << hex(lock_keys[i])
                  << " table_id=" << table_id << " txn_id=" << txn_id;

        for (auto tablet_id : table_id_tablet_ids[table_id]) {
            std::string pending_key = meta_pending_delete_bitmap_key({instance_id, tablet_id});
            txn->remove(pending_key);
            LOG(INFO) << "xxx remove delete bitmap pending key, pending_key=" << hex(pending_key)
                      << " txn_id=" << txn_id;
        }
    }
    lock_keys.clear();
    lock_values.clear();
}

std::pair<MetaServiceCode, std::string> get_tablet_indexes(
        Transaction* txn, std::unordered_map<int64_t, TabletIndexPB>* tablet_indexes,
        std::string_view instance_id, const std::vector<int64_t>& tablet_ids,
        bool snapshot = false) {
    std::vector<std::string> tablet_idx_keys;
    std::vector<std::optional<std::string>> tablet_idx_values;
    tablet_idx_keys.reserve(tablet_ids.size());
    tablet_idx_values.resize(tablet_idx_keys.size());

    for (int64_t tablet_id : tablet_ids) {
        tablet_idx_keys.push_back(meta_tablet_idx_key({instance_id, tablet_id}));
    }

    TxnErrorCode err = txn->batch_get(&tablet_idx_values, tablet_idx_keys,
                                      Transaction::BatchGetOptions(false));
    if (err != TxnErrorCode::TXN_OK) {
        auto msg = fmt::format("failed to get tablet table index ids, err={}", err);
        LOG_WARNING(msg);
        return {cast_as<ErrCategory::READ>(err), msg};
    }

    size_t total_tablets = tablet_idx_values.size();
    for (size_t i = 0; i < total_tablets; i++) {
        int64_t tablet_id = tablet_ids[i];
        if (!tablet_idx_values[i].has_value()) [[unlikely]] {
            // The value must existed
            auto msg = fmt::format(
                    "failed to get tablet table index ids, err=not found tablet_id={} ", tablet_id);
            LOG_WARNING(msg).tag("err", err).tag("key", hex(tablet_idx_keys[i]));
            return {MetaServiceCode::KV_TXN_GET_ERR, msg};
        }

        TabletIndexPB tablet_index;
        if (!tablet_index.ParseFromString(tablet_idx_values[i].value())) [[unlikely]] {
            auto msg = fmt::format("malformed tablet index value tablet_id={} snapshot={}",
                                   tablet_id, snapshot);
            LOG_WARNING(msg).tag("key", hex(tablet_idx_keys[i]));
            return {MetaServiceCode::PROTOBUF_PARSE_ERR, msg};
        }

        VLOG_DEBUG << "tablet_id:" << tablet_id << " value:" << tablet_index.ShortDebugString();
        tablet_indexes->emplace(tablet_id, std::move(tablet_index));
    }

    return {MetaServiceCode::OK, ""};
}

std::pair<MetaServiceCode, std::string> get_partition_versions(
        Transaction* txn, std::unordered_map<int64_t, int64_t>* versions,
        int64_t* last_pending_txn_id, std::string_view instance_id,
        std::unordered_map<int64_t, std::tuple<int64_t, int64_t>>& partition_indexes) {
    std::vector<int64_t> partition_ids;
    std::vector<std::string> version_keys;
    std::vector<std::optional<std::string>> version_values;
    partition_ids.reserve(partition_indexes.size());
    version_keys.reserve(partition_indexes.size());
    version_values.reserve(partition_indexes.size());

    for (auto&& [partition_id, db_and_table] : partition_indexes) {
        auto [db_id, table_id] = db_and_table;
        std::string ver_key = partition_version_key({instance_id, db_id, table_id, partition_id});
        version_keys.push_back(std::move(ver_key));
        partition_ids.push_back(partition_id);
    }

    TxnErrorCode err = txn->batch_get(&version_values, version_keys);
    if (err != TxnErrorCode::TXN_OK) {
        auto msg = fmt::format("failed to get partition versions, err={}", err);
        LOG_WARNING(msg);
        return {cast_as<ErrCategory::READ>(err), msg};
    }

    size_t total_versions = version_keys.size();
    for (size_t i = 0; i < total_versions; i++) {
        int64_t version;
        if (version_values[i].has_value()) {
            VersionPB version_pb;
            if (!version_pb.ParseFromString(version_values[i].value())) {
                auto msg = fmt::format("malformed version value, key={}", hex(version_keys[i]));
                LOG_WARNING(msg);
                return {MetaServiceCode::PROTOBUF_PARSE_ERR, msg};
            }
            if (version_pb.pending_txn_ids_size() > 0) {
                DCHECK(version_pb.pending_txn_ids_size() == 1);
                *last_pending_txn_id = version_pb.pending_txn_ids(0);
                DCHECK(*last_pending_txn_id > 0);
                return {MetaServiceCode::OK, ""};
            }
            version = version_pb.version();
        } else {
            version = 1;
        }
        VLOG_DEBUG << "get partition version, partition_id=" << partition_ids[i]
                   << " version=" << version << " key=" << hex(version_keys[i])
                   << " has_value=" << version_values[i].has_value();
        versions->emplace(partition_ids[i], version);
        last_pending_txn_id = 0;
    }

    return {MetaServiceCode::OK, ""};
}

/**
 * 0. Extract txn_id from request
 * 1. Get db id from TxnKv with txn_id
 * 2. Get TxnInfo from TxnKv with db_id and txn_id
 * 3. Get tmp rowset meta, there may be several or hundred of tmp rowsets
 * 4. Get versions of each rowset
 * 5. Put rowset meta, which will be visible to user
 * 6. Put TxnInfo back into TxnKv with updated txn status (committed)
 * 7. Update versions of each partition
 * 8. Remove tmp rowset meta
 *
 * Note: getting version and all changes maded are in a single TxnKv transaction:
 *       step 5, 6, 7, 8
 */
void MetaServiceImpl::commit_txn_immediately(
        const CommitTxnRequest* request, CommitTxnResponse* response, MetaServiceCode& code,
        std::string& msg, const std::string& instance_id, int64_t db_id,
        std::vector<std::pair<std::string, doris::RowsetMetaCloudPB>>& tmp_rowsets_meta,
        TxnErrorCode& err, KVStats& stats) {
    std::stringstream ss;
    int64_t txn_id = request->txn_id();

    bool is_versioned_write = is_version_write_enabled(instance_id);
    bool is_versioned_read = is_version_read_enabled(instance_id);
    do {
        TEST_SYNC_POINT_CALLBACK("commit_txn_immediately:begin", &txn_id);
        int64_t last_pending_txn_id = 0;
        std::unique_ptr<Transaction> txn;
        err = txn_kv_->create_txn(&txn);
        if (err != TxnErrorCode::TXN_OK) {
            code = cast_as<ErrCategory::CREATE>(err);
            ss << "failed to create txn, txn_id=" << txn_id << " err=" << err;
            msg = ss.str();
            LOG(WARNING) << msg;
            return;
        }
        DORIS_CLOUD_DEFER {
            if (txn == nullptr) return;
            stats.get_bytes += txn->get_bytes();
            stats.put_bytes += txn->put_bytes();
            stats.del_bytes += txn->delete_bytes();
            stats.get_counter += txn->num_get_keys();
            stats.put_counter += txn->num_put_keys();
            stats.del_counter += txn->num_del_keys();
        };

        // Get txn info with db_id and txn_id
        std::string info_val; // Will be reused when saving updated txn
        const std::string info_key = txn_info_key({instance_id, db_id, txn_id});
        err = txn->get(info_key, &info_val);
        if (err != TxnErrorCode::TXN_OK) {
            code = err == TxnErrorCode::TXN_KEY_NOT_FOUND ? MetaServiceCode::TXN_ID_NOT_FOUND
                                                          : cast_as<ErrCategory::READ>(err);
            if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) {
                ss << "transaction [" << txn_id << "] not found, db_id=" << db_id;
            } else {
                ss << "failed to get txn_info, db_id=" << db_id << " txn_id=" << txn_id
                   << " err=" << err;
            }
            msg = ss.str();
            LOG(WARNING) << msg;
            return;
        }

        TxnInfoPB txn_info;
        if (!txn_info.ParseFromString(info_val)) {
            code = MetaServiceCode::PROTOBUF_PARSE_ERR;
            ss << "failed to parse txn_info, db_id=" << db_id << " txn_id=" << txn_id;
            msg = ss.str();
            LOG(WARNING) << msg;
            return;
        }

        // TODO: do more check like txn state, 2PC etc.
        DCHECK(txn_info.txn_id() == txn_id);
        if (txn_info.status() == TxnStatusPB::TXN_STATUS_ABORTED) {
            code = MetaServiceCode::TXN_ALREADY_ABORTED;
            ss << "transaction [" << txn_id << "] is already aborted, db_id=" << db_id;
            msg = ss.str();
            LOG(WARNING) << msg;
            return;
        }

        if (txn_info.status() == TxnStatusPB::TXN_STATUS_VISIBLE) {
            if (request->has_is_2pc() && request->is_2pc()) {
                code = MetaServiceCode::TXN_ALREADY_VISIBLE;
                ss << "transaction [" << txn_id << "] is already visible, not pre-committed.";
                msg = ss.str();
                LOG(INFO) << msg;
                response->mutable_txn_info()->CopyFrom(txn_info);
                return;
            }
            code = MetaServiceCode::OK;
            ss << "transaction is already visible: db_id=" << db_id << " txn_id=" << txn_id;
            msg = ss.str();
            LOG(INFO) << msg;
            response->mutable_txn_info()->CopyFrom(txn_info);
            return;
        }

        if (request->has_is_2pc() && request->is_2pc() &&
            txn_info.status() == TxnStatusPB::TXN_STATUS_PREPARED) {
            code = MetaServiceCode::TXN_INVALID_STATUS;
            ss << "transaction is prepare, not pre-committed: db_id=" << db_id << " txn_id"
               << txn_id;
            msg = ss.str();
            LOG(WARNING) << msg;
            return;
        }

        LOG(INFO) << "txn_id=" << txn_id << " txn_info=" << txn_info.ShortDebugString();

        CloneChainReader meta_reader(instance_id, resource_mgr_.get());

        // Prepare rowset meta and new_versions
        AnnotateTag txn_tag("txn_id", txn_id);
        std::unordered_map<int64_t, TabletIndexPB> tablet_ids;
        auto acquired_tablet_ids = to_container<std::vector<int64_t>>(
                std::ranges::ref_view(tmp_rowsets_meta) |
                std::ranges::views::transform(
                        [](const auto& pair) { return pair.second.tablet_id(); }));
        if (!is_versioned_read) {
            std::tie(code, msg) =
                    get_tablet_indexes(txn.get(), &tablet_ids, instance_id, acquired_tablet_ids);
            if (code != MetaServiceCode::OK) {
                return;
            }
        } else {
            err = meta_reader.get_tablet_indexes(txn.get(), acquired_tablet_ids, &tablet_ids);
            if (err != TxnErrorCode::TXN_OK) {
                code = cast_as<ErrCategory::READ>(err);
                msg = fmt::format("failed to get tablet indexes, err={}", err);
                LOG_WARNING(msg);
                return;
            }
        }

        std::unordered_map<int64_t, std::tuple<int64_t, int64_t>> partition_indexes;
        for (auto& [_, i] : tmp_rowsets_meta) {
            int64_t tablet_id = i.tablet_id();
            int64_t table_id = tablet_ids[tablet_id].table_id();
            int64_t partition_id = i.partition_id();
            partition_indexes.insert({partition_id, {db_id, table_id}});
        }

        // {table/partition} -> version
        std::unordered_map<int64_t, int64_t> versions;
        if (!is_versioned_read) {
            std::tie(code, msg) = get_partition_versions(txn.get(), &versions, &last_pending_txn_id,
                                                         instance_id, partition_indexes);
            if (code != MetaServiceCode::OK) {
                return;
            }
        } else {
            std::vector<int64_t> partition_ids = to_container<std::vector<int64_t>>(
                    std::ranges::ref_view(partition_indexes) | std::ranges::views::keys);
            err = meta_reader.get_partition_versions(txn.get(), partition_ids, &versions,
                                                     &last_pending_txn_id);
            if (err != TxnErrorCode::TXN_OK) {
                code = cast_as<ErrCategory::READ>(err);
                msg = fmt::format("failed to get partition versions, err={}", err);
                LOG_WARNING(msg);
                return;
            }
        }

        if (last_pending_txn_id > 0) {
            stats.get_bytes += txn->get_bytes();
            stats.get_counter += txn->num_get_keys();
            txn.reset();
            TEST_SYNC_POINT_CALLBACK("commit_txn_immediately::advance_last_pending_txn_id",
                                     &last_pending_txn_id);
            std::shared_ptr<TxnLazyCommitTask> task =
                    txn_lazy_committer_->submit(instance_id, last_pending_txn_id);

            std::tie(code, msg) = task->wait();
            if (code != MetaServiceCode::OK) {
                LOG(WARNING) << "advance_last_txn failed last_txn=" << last_pending_txn_id
                             << " code=" << code << " msg=" << msg;
                return;
            }
            last_pending_txn_id = 0;
            continue;
        }

        record_txn_commit_stats(txn.get(), instance_id, partition_indexes.size(), tablet_ids.size(),
                                txn_id);

        CommitTxnLogPB commit_txn_log;
        commit_txn_log.set_txn_id(txn_id);
        commit_txn_log.set_db_id(db_id);

        // <tablet_id, version> -> rowset meta
        std::vector<std::pair<std::tuple<int64_t, int64_t>, const RowsetMetaCloudPB&>> rowsets;
        std::unordered_map<int64_t, TabletStats> tablet_stats; // tablet_id -> stats
        rowsets.reserve(tmp_rowsets_meta.size());

        int64_t rowsets_visible_ts_ms =
                duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();

        for (auto& [_, i] : tmp_rowsets_meta) {
            int64_t tablet_id = i.tablet_id();
            int64_t partition_id = i.partition_id();
            if (!versions.contains(partition_id)) [[unlikely]] {
                // it is impossible.
                code = MetaServiceCode::UNDEFINED_ERR;
                ss << "failed to get partition version key, the target version not exists in "
                      "versions."
                   << " txn_id=" << txn_id << " partition_id=" << partition_id;
                ss << " versions";
                for (const auto& [pid, ver] : versions) {
                    ss << " partition_id=" << pid << " version=" << ver;
                }
                msg = ss.str();
                LOG(ERROR) << msg;
                return;
            }

            // Update rowset version
            int64_t new_version = versions[partition_id] + 1;
            i.set_start_version(new_version);
            i.set_end_version(new_version);
            i.set_visible_ts_ms(rowsets_visible_ts_ms);

            // Accumulate affected rows
            auto& stats = tablet_stats[tablet_id];
            stats.data_size += i.total_disk_size();
            stats.num_rows += i.num_rows();
            ++stats.num_rowsets;
            stats.num_segs += i.num_segments();
            stats.index_size += i.index_disk_size();
            stats.segment_size += i.data_disk_size();

            commit_txn_log.mutable_tablet_to_partition_map()->insert({tablet_id, partition_id});
            commit_txn_log.mutable_partition_version_map()->insert({partition_id, new_version});

            rowsets.emplace_back(std::make_tuple(tablet_id, i.end_version()), i);
        } // for tmp_rowsets_meta

        std::unordered_map<int64_t, std::vector<int64_t>> table_id_tablet_ids;
        for (auto& [tablet_id, tablet_index] : tablet_ids) {
            int64_t table_id = tablet_index.table_id();
            table_id_tablet_ids[table_id].push_back(tablet_id);
        }
        process_mow_when_commit_txn(request, instance_id, code, msg, txn, table_id_tablet_ids);
        if (code != MetaServiceCode::OK) {
            LOG(WARNING) << "process mow failed, txn_id=" << txn_id << " code=" << code;
            return;
        }

        // Save rowset meta
        for (auto& i : rowsets) {
            auto [tablet_id, version] = i.first;
            std::string rowset_key = meta_rowset_key({instance_id, tablet_id, version});
            std::string val;
            if (!i.second.SerializeToString(&val)) {
                code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
                ss << "failed to serialize rowset_meta, txn_id=" << txn_id;
                msg = ss.str();
                return;
            }
            size_t rowset_size = rowset_key.size() + val.size();
            txn->put(rowset_key, val);
            LOG(INFO) << "put rowset_key=" << hex(rowset_key) << " txn_id=" << txn_id
                      << " rowset_size=" << rowset_size;

            if (is_versioned_write) {
                std::string versioned_rowset_key =
                        versioned::meta_rowset_load_key({instance_id, tablet_id, version});
                RowsetMetaCloudPB copied_rowset_meta(i.second);
                if (!versioned::document_put(txn.get(), versioned_rowset_key,
                                             std::move(copied_rowset_meta))) {
                    code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
                    ss << "failed to put versioned rowset meta, txn_id=" << txn_id
                       << " key=" << hex(versioned_rowset_key);
                    msg = ss.str();
                    LOG(WARNING) << msg;
                    return;
                }
                LOG(INFO) << "put versioned rowset meta key=" << hex(versioned_rowset_key)
                          << ", txn_id=" << txn_id;
            }
        }

        // Save versions
        int64_t version_update_time_ms =
                duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
        response->set_version_update_time_ms(version_update_time_ms);
        for (auto& [partition_id, version] : versions) {
            int64_t new_version = version + 1;
            std::string ver_val;
            VersionPB version_pb;
            version_pb.set_version(new_version);
            version_pb.set_update_time_ms(version_update_time_ms);
            if (!version_pb.SerializeToString(&ver_val)) {
                code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
                ss << "failed to serialize version_pb when saving, txn_id=" << txn_id;
                msg = ss.str();
                return;
            }

            auto [db_id, table_id] = partition_indexes[partition_id];
            auto version_key = partition_version_key({instance_id, db_id, table_id, partition_id});
            txn->put(version_key, ver_val);
            LOG(INFO) << "put partition_version_key=" << hex(version_key)
                      << " version:" << new_version << " txn_id=" << txn_id
                      << " partition_id=" << partition_id
                      << " update_time=" << version_update_time_ms;

            VLOG_DEBUG << " table_id=" << table_id << " partition_id=" << partition_id;

            if (is_versioned_write) {
                std::string partition_version_key =
                        versioned::partition_version_key({instance_id, partition_id});
                versioned_put(txn.get(), partition_version_key, ver_val);
                LOG(INFO) << "put versioned partition key=" << hex(partition_version_key)
                          << ", txn_id=" << txn_id;
            }

            response->add_table_ids(table_id);
            response->add_partition_ids(partition_id);
            response->add_versions(new_version);
        }

        // Save table versions
        for (auto& i : table_id_tablet_ids) {
            if (is_versioned_read) {
                // Read the table version, to build the operation log visible version range.
                err = meta_reader.get_table_version(txn.get(), i.first, nullptr, true);
                if (err != TxnErrorCode::TXN_OK && err != TxnErrorCode::TXN_KEY_NOT_FOUND) {
                    code = cast_as<ErrCategory::READ>(err);
                    ss << "failed to get table version, err=" << err << " table_id=" << i.first;
                    msg = ss.str();
                    LOG(WARNING) << msg;
                    return;
                }
            }
            update_table_version(txn.get(), instance_id, db_id, i.first);
            commit_txn_log.add_table_ids(i.first);
        }

        LOG(INFO) << " before update txn_info=" << txn_info.ShortDebugString();

        // Update txn_info
        txn_info.set_status(TxnStatusPB::TXN_STATUS_VISIBLE);

        auto now_time = system_clock::now();
        uint64_t commit_time = duration_cast<milliseconds>(now_time.time_since_epoch()).count();
        if ((txn_info.prepare_time() + txn_info.timeout_ms()) < commit_time) {
            code = MetaServiceCode::UNDEFINED_ERR;
            msg = fmt::format("txn is expired, not allow to commit txn_id={}", txn_id);
            LOG(INFO) << msg << " prepare_time=" << txn_info.prepare_time()
                      << " timeout_ms=" << txn_info.timeout_ms() << " commit_time=" << commit_time;
            return;
        }
        txn_info.set_commit_time(commit_time);
        txn_info.set_finish_time(commit_time);
        if (request->has_commit_attachment()) {
            txn_info.mutable_commit_attachment()->CopyFrom(request->commit_attachment());
        }

        txn_info.set_versioned_write(is_versioned_write);
        txn_info.set_versioned_read(is_versioned_read);

        LOG(INFO) << "after update txn_info=" << txn_info.ShortDebugString();
        info_val.clear();
        if (!txn_info.SerializeToString(&info_val)) {
            code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
            ss << "failed to serialize txn_info when saving, txn_id=" << txn_id;
            msg = ss.str();
            return;
        }
        txn->put(info_key, info_val);
        LOG(INFO) << "put info_key=" << hex(info_key) << " txn_id=" << txn_id;

        // Batch get existing versioned tablet stats if needed
        std::unordered_map<int64_t, TabletStatsPB> existing_versioned_stats;
        if (is_versioned_write && !tablet_stats.empty()) {
            internal_get_load_tablet_stats_batch(code, msg, meta_reader, txn.get(), instance_id,
                                                 tablet_ids, &existing_versioned_stats);
            if (code != MetaServiceCode::OK) {
                LOG(WARNING) << "batch get versioned tablet stats failed, code=" << code
                             << " msg=" << msg << " txn_id=" << txn_id;
                return;
            }
            LOG(INFO) << "batch get " << existing_versioned_stats.size()
                      << " versioned tablet stats, txn_id=" << txn_id;
        }

        // Update stats of affected tablet
        for (auto& [tablet_id, stats] : tablet_stats) {
            DCHECK(tablet_ids.count(tablet_id));
            auto& tablet_idx = tablet_ids[tablet_id];
            StatsTabletKeyInfo info {instance_id, tablet_idx.table_id(), tablet_idx.index_id(),
                                     tablet_idx.partition_id(), tablet_id};
            update_tablet_stats(info, stats, txn, code, msg);
            if (code != MetaServiceCode::OK) return;

            if (is_versioned_write) {
                TabletStatsPB stats_pb = existing_versioned_stats[tablet_id];
                merge_tablet_stats(stats_pb, stats);
                std::string stats_key = versioned::tablet_load_stats_key({instance_id, tablet_id});
                if (!versioned::document_put(txn.get(), stats_key, std::move(stats_pb))) {
                    code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
                    msg = "failed to serialize versioned tablet stats";
                    LOG(WARNING) << msg << " tablet_id=" << tablet_id << " txn_id=" << txn_id;
                    return;
                }
                LOG(INFO) << "put versioned tablet stats key=" << hex(stats_key)
                          << " tablet_id=" << tablet_id << " txn_id=" << txn_id;
            }
        }
        // Remove tmp rowset meta
        for (auto& [k, _] : tmp_rowsets_meta) {
            txn->remove(k);
            LOG(INFO) << "remove tmp_rowset_key=" << hex(k) << " txn_id=" << txn_id;
        }

        const std::string running_key = txn_running_key({instance_id, db_id, txn_id});
        LOG(INFO) << "remove running_key=" << hex(running_key) << " txn_id=" << txn_id;
        txn->remove(running_key);

        RecycleTxnPB recycle_pb;
        recycle_pb.set_creation_time(commit_time);
        recycle_pb.set_label(txn_info.label());

        if (is_versioned_write) {
            commit_txn_log.mutable_recycle_txn()->Swap(&recycle_pb);
            std::string log_key = versioned::log_key({instance_id});
            OperationLogPB operation_log;
            if (is_versioned_read) {
                operation_log.set_min_timestamp(meta_reader.min_read_version());
            }
            operation_log.mutable_commit_txn()->Swap(&commit_txn_log);
            versioned::blob_put(txn.get(), log_key, operation_log);
            LOG(INFO) << "put commit txn operation log, key=" << hex(log_key)
                      << " txn_id=" << txn_id;
        } else {
            std::string recycle_key = recycle_txn_key({instance_id, db_id, txn_id});
            std::string recycle_val;
            if (!recycle_pb.SerializeToString(&recycle_val)) {
                code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
                ss << "failed to serialize recycle_pb, txn_id=" << txn_id;
                msg = ss.str();
                return;
            }
            txn->put(recycle_key, recycle_val);
            LOG(INFO) << "xxx commit_txn put recycle_key key=" << hex(recycle_key)
                      << " txn_id=" << txn_id;
        }

        if (txn_info.load_job_source_type() ==
            LoadJobSourceTypePB::LOAD_JOB_SRC_TYPE_ROUTINE_LOAD_TASK) {
            put_routine_load_progress(code, msg, instance_id, request, txn.get(), db_id);
        }

        if (txn_info.load_job_source_type() ==
            LoadJobSourceTypePB::LOAD_JOB_SRC_TYPE_STREAMING_JOB) {
            update_streaming_job_meta(code, msg, instance_id, request, txn.get(), db_id);
            if (code != MetaServiceCode::OK) {
                LOG(WARNING) << "update_streaming_job_meta failed, txn_id=" << txn_id
                             << " code=" << code << " msg=" << msg;
                return;
            }
        }

        LOG(INFO) << "commit_txn put_size=" << txn->put_bytes()
                  << " del_size=" << txn->delete_bytes() << " num_put_keys=" << txn->num_put_keys()
                  << " num_del_keys=" << txn->num_del_keys()
                  << " txn_size=" << txn->approximate_bytes() << " txn_id=" << txn_id;

        TEST_SYNC_POINT_RETURN_WITH_VOID("commit_txn_immediately::before_commit", &err, &code);
        // Finally we are done...
        err = txn->commit();
        if (err != TxnErrorCode::TXN_OK) {
            if (err == TxnErrorCode::TXN_CONFLICT) {
                g_bvar_delete_bitmap_lock_txn_remove_conflict_by_load_counter << 1;
            }
            code = cast_as<ErrCategory::COMMIT>(err);
            ss << "failed to commit kv txn, txn_id=" << txn_id << " err=" << err;
            msg = ss.str();
            return;
        }

        // calculate table stats from tablets stats
        std::map<int64_t /*table_id*/, TableStats> table_stats;
        std::vector<int64_t> base_tablet_ids(request->base_tablet_ids().begin(),
                                             request->base_tablet_ids().end());
        calc_table_stats(tablet_ids, tablet_stats, table_stats, base_tablet_ids);
        for (const auto& pair : table_stats) {
            TableStatsPB* stats_pb = response->add_table_stats();
            auto table_id = pair.first;
            auto stats = pair.second;
            get_pb_from_tablestats(stats, stats_pb);
            stats_pb->set_table_id(table_id);
            VLOG_DEBUG << "Add TableStats to CommitTxnResponse. txn_id=" << txn_id
                       << " table_id=" << table_id
                       << " updated_row_count=" << stats_pb->updated_row_count();
        }
        response->mutable_txn_info()->CopyFrom(txn_info);
        TEST_SYNC_POINT_CALLBACK("commit_txn_immediately::finish", &code);
        break;
    } while (true);
} // end commit_txn_immediately

// rewrite TabletIndexPB for fill db_id, in case of historical reasons
// TabletIndexPB missing db_id
void repair_tablet_index(
        std::shared_ptr<TxnKv>& txn_kv, MetaServiceCode& code, std::string& msg,
        const std::string& instance_id, int64_t db_id, int64_t txn_id,
        const std::vector<std::pair<std::string, doris::RowsetMetaCloudPB>>& tmp_rowsets_meta,
        bool is_versioned_write) {
    std::stringstream ss;
    std::vector<std::string> tablet_idx_keys;
    for (auto& [_, i] : tmp_rowsets_meta) {
        tablet_idx_keys.push_back(meta_tablet_idx_key({instance_id, i.tablet_id()}));
    }

    for (size_t i = 0; i < tablet_idx_keys.size(); i += config::max_tablet_index_num_per_batch) {
        size_t end = (i + config::max_tablet_index_num_per_batch) > tablet_idx_keys.size()
                             ? tablet_idx_keys.size()
                             : i + config::max_tablet_index_num_per_batch;
        const std::vector<std::string> sub_tablet_idx_keys(tablet_idx_keys.begin() + i,
                                                           tablet_idx_keys.begin() + end);

        std::unique_ptr<Transaction> txn;
        TxnErrorCode err = txn_kv->create_txn(&txn);
        if (err != TxnErrorCode::TXN_OK) {
            code = cast_as<ErrCategory::CREATE>(err);
            ss << "failed to create txn, txn_id=" << txn_id << " err=" << err;
            msg = ss.str();
            LOG(WARNING) << msg;
            return;
        }

        std::vector<std::optional<std::string>> tablet_idx_values;
        // batch get snapshot is false
        err = txn->batch_get(&tablet_idx_values, sub_tablet_idx_keys,
                             Transaction::BatchGetOptions(false));
        if (err != TxnErrorCode::TXN_OK) {
            code = cast_as<ErrCategory::READ>(err);
            ss << "failed to get tablet table index ids, err=" << err;
            msg = ss.str();
            LOG(WARNING) << msg << " txn_id=" << txn_id;
            return;
        }
        DCHECK(tablet_idx_values.size() <= config::max_tablet_index_num_per_batch);

        for (size_t j = 0; j < sub_tablet_idx_keys.size(); j++) {
            if (!tablet_idx_values[j].has_value()) [[unlikely]] {
                // The value must existed
                code = MetaServiceCode::KV_TXN_GET_ERR;
                ss << "failed to get tablet table index ids, err=not found"
                   << " key=" << hex(tablet_idx_keys[j]);
                msg = ss.str();
                LOG(WARNING) << msg << " err=" << err << " txn_id=" << txn_id;
                return;
            }
            TabletIndexPB tablet_idx_pb;
            if (!tablet_idx_pb.ParseFromString(tablet_idx_values[j].value())) [[unlikely]] {
                code = MetaServiceCode::PROTOBUF_PARSE_ERR;
                ss << "malformed tablet index value key=" << hex(tablet_idx_keys[j])
                   << " txn_id=" << txn_id;
                msg = ss.str();
                LOG(WARNING) << msg;
                return;
            }

            if (!tablet_idx_pb.has_db_id()) {
                tablet_idx_pb.set_db_id(db_id);
                std::string idx_val;
                if (!tablet_idx_pb.SerializeToString(&idx_val)) {
                    code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
                    ss << "failed to serialize tablet index value key=" << hex(tablet_idx_keys[j])
                       << " txn_id=" << txn_id;
                    msg = ss.str();
                    LOG(WARNING) << msg;
                    return;
                }
                txn->put(sub_tablet_idx_keys[j], idx_val);
                LOG(INFO) << " repair tablet index txn_id=" << txn_id
                          << " tablet_idx_pb:" << tablet_idx_pb.ShortDebugString()
                          << " key=" << hex(sub_tablet_idx_keys[j]);
                if (is_versioned_write) {
                    std::string versioned_tablet_idx_key =
                            versioned::tablet_index_key({instance_id, tablet_idx_pb.tablet_id()});
                    std::string versioned_tablet_inverted_idx_key =
                            versioned::tablet_inverted_index_key(
                                    {instance_id, db_id, tablet_idx_pb.table_id(),
                                     tablet_idx_pb.index_id(), tablet_idx_pb.partition_id(),
                                     tablet_idx_pb.tablet_id()});
                    txn->put(versioned_tablet_idx_key, idx_val);
                    txn->put(versioned_tablet_inverted_idx_key, "");
                    LOG(INFO) << "repair tablet index and inverted index, txn_id=" << txn_id
                              << " tablet_id=" << tablet_idx_pb.tablet_id()
                              << " index_key=" << hex(versioned_tablet_idx_key)
                              << " inverted_index_key=" << hex(versioned_tablet_inverted_idx_key);
                }
            }
        }

        err = txn->commit();
        if (err != TxnErrorCode::TXN_OK) {
            code = cast_as<ErrCategory::COMMIT>(err);
            ss << "failed to commit kv txn, txn_id=" << txn_id << " err=" << err;
            msg = ss.str();
            LOG(WARNING) << msg;
            return;
        }
    }
    code = MetaServiceCode::OK;
}

void MetaServiceImpl::commit_txn_eventually(
        const CommitTxnRequest* request, CommitTxnResponse* response, MetaServiceCode& code,
        std::string& msg, const std::string& instance_id, int64_t db_id,
        const std::vector<std::pair<std::string, doris::RowsetMetaCloudPB>>& tmp_rowsets_meta,
        KVStats& stats) {
    StopWatch sw;
    DORIS_CLOUD_DEFER {
        if (config::use_detailed_metrics && !instance_id.empty()) {
            g_bvar_ms_commit_txn_eventually.put(instance_id, sw.elapsed_us());
        }
    };

    std::stringstream ss;
    TxnErrorCode err = TxnErrorCode::TXN_OK;
    int64_t txn_id = request->txn_id();

    bool is_versioned_write = is_version_write_enabled(instance_id);
    bool is_versioned_read = is_version_read_enabled(instance_id);

    do {
        TEST_SYNC_POINT_CALLBACK("commit_txn_eventually:begin", &txn_id);
        int64_t last_pending_txn_id = 0;
        std::unique_ptr<Transaction> txn;
        err = txn_kv_->create_txn(&txn);
        if (err != TxnErrorCode::TXN_OK) {
            code = cast_as<ErrCategory::CREATE>(err);
            ss << "failed to create txn, txn_id=" << txn_id << " err=" << err;
            msg = ss.str();
            LOG(WARNING) << msg;
            return;
        }
        DORIS_CLOUD_DEFER {
            if (txn == nullptr) return;
            stats.get_bytes += txn->get_bytes();
            stats.put_bytes += txn->put_bytes();
            stats.del_bytes += txn->delete_bytes();
            stats.get_counter += txn->num_get_keys();
            stats.put_counter += txn->num_put_keys();
            stats.del_counter += txn->num_del_keys();
        };

        CloneChainReader meta_reader(instance_id, resource_mgr_.get());

        AnnotateTag txn_tag("txn_id", txn_id);

        // tablet_id -> {table/index/partition}_id
        std::unordered_map<int64_t, TabletIndexPB> tablet_ids;
        auto acquired_tablet_ids = to_container<std::vector<int64_t>>(
                std::ranges::ref_view(tmp_rowsets_meta) |
                std::ranges::views::transform(
                        [](const auto& pair) { return pair.second.tablet_id(); }));
        if (!is_versioned_read) {
            std::tie(code, msg) =
                    get_tablet_indexes(txn.get(), &tablet_ids, instance_id, acquired_tablet_ids);
            if (code != MetaServiceCode::OK) {
                return;
            }
        } else {
            err = meta_reader.get_tablet_indexes(txn.get(), acquired_tablet_ids, &tablet_ids);
            if (err != TxnErrorCode::TXN_OK) {
                code = cast_as<ErrCategory::READ>(err);
                msg = fmt::format("failed to get tablet indexes, err={}", err);
                LOG_WARNING(msg);
                return;
            }
        }

        bool need_repair_tablet_idx =
                std::any_of(tablet_ids.begin(), tablet_ids.end(),
                            [](const auto& pair) { return !pair.second.has_db_id(); });

        TEST_SYNC_POINT_CALLBACK("commit_txn_eventually::need_repair_tablet_idx",
                                 &need_repair_tablet_idx);
        if (need_repair_tablet_idx) {
            stats.get_bytes += txn->get_bytes();
            stats.get_counter += txn->num_get_keys();
            txn.reset();
            repair_tablet_index(txn_kv_, code, msg, instance_id, db_id, txn_id, tmp_rowsets_meta,
                                is_versioned_write);
            if (code != MetaServiceCode::OK) {
                LOG(WARNING) << "repair_tablet_index failed, txn_id=" << txn_id << " code=" << code;
                return;
            }
            continue;
        }

        CommitTxnLogPB commit_txn_log;
        commit_txn_log.set_txn_id(txn_id);
        commit_txn_log.set_db_id(db_id);

        // <partition_version_key, version>
        std::unordered_map<int64_t, std::tuple<int64_t, int64_t>> partition_indexes;
        for (auto& [_, i] : tmp_rowsets_meta) {
            int64_t tablet_id = i.tablet_id();
            int64_t table_id = tablet_ids[tablet_id].table_id();
            int64_t partition_id = i.partition_id();
            partition_indexes.insert({partition_id, {db_id, table_id}});
            commit_txn_log.mutable_tablet_to_partition_map()->insert({tablet_id, partition_id});
        }

        std::unordered_map<int64_t, int64_t> versions;
        if (!is_versioned_read) {
            std::tie(code, msg) = get_partition_versions(txn.get(), &versions, &last_pending_txn_id,
                                                         instance_id, partition_indexes);
            if (code != MetaServiceCode::OK) {
                return;
            }
        } else {
            std::vector<int64_t> partition_ids = to_container<std::vector<int64_t>>(
                    std::ranges::ref_view(partition_indexes) | std::ranges::views::keys);
            err = meta_reader.get_partition_versions(txn.get(), partition_ids, &versions,
                                                     &last_pending_txn_id);
            if (err != TxnErrorCode::TXN_OK) {
                code = cast_as<ErrCategory::READ>(err);
                msg = fmt::format("failed to get partition versions, err={}", err);
                LOG_WARNING(msg);
                return;
            }
        }

        TEST_SYNC_POINT_CALLBACK("commit_txn_eventually::last_pending_txn_id",
                                 &last_pending_txn_id);

        if (last_pending_txn_id > 0) {
            TEST_SYNC_POINT_CALLBACK("commit_txn_eventually::advance_last_pending_txn_id",
                                     &last_pending_txn_id);
            stats.get_bytes += txn->get_bytes();
            stats.get_counter += txn->num_get_keys();
            txn.reset();
            std::shared_ptr<TxnLazyCommitTask> task =
                    txn_lazy_committer_->submit(instance_id, last_pending_txn_id);

            std::tie(code, msg) = task->wait();
            if (code != MetaServiceCode::OK) {
                LOG(WARNING) << "advance_last_txn failed last_txn=" << last_pending_txn_id
                             << " code=" << code << " msg=" << msg;
                return;
            }

            last_pending_txn_id = 0;
            // there maybe concurrent commit_txn_eventually, so we need continue to make sure
            // partition versionPB has no txn_id
            continue;
        }

        record_txn_commit_stats(txn.get(), instance_id, partition_indexes.size(), tablet_ids.size(),
                                txn_id);

        std::string info_val;
        const std::string info_key = txn_info_key({instance_id, db_id, txn_id});
        err = txn->get(info_key, &info_val);
        if (err != TxnErrorCode::TXN_OK) {
            code = err == TxnErrorCode::TXN_KEY_NOT_FOUND ? MetaServiceCode::TXN_ID_NOT_FOUND
                                                          : cast_as<ErrCategory::READ>(err);
            ss << "failed to get txn_info, db_id=" << db_id << " txn_id=" << txn_id
               << " err=" << err;
            msg = ss.str();
            LOG(WARNING) << msg;
            return;
        }

        TxnInfoPB txn_info;
        if (!txn_info.ParseFromString(info_val)) {
            code = MetaServiceCode::PROTOBUF_PARSE_ERR;
            ss << "failed to parse txn_info, db_id=" << db_id << " txn_id=" << txn_id;
            msg = ss.str();
            LOG(WARNING) << msg;
            return;
        }
        LOG(INFO) << "txn_id=" << txn_id << " txn_info=" << txn_info.ShortDebugString();

        DCHECK(txn_info.txn_id() == txn_id);
        if (txn_info.status() == TxnStatusPB::TXN_STATUS_ABORTED) {
            code = MetaServiceCode::TXN_ALREADY_ABORTED;
            ss << "transaction [" << txn_id << "] is already aborted, db_id=" << db_id;
            msg = ss.str();
            LOG(WARNING) << msg;
            return;
        }

        if (txn_info.status() == TxnStatusPB::TXN_STATUS_VISIBLE) {
            if (request->has_is_2pc() && request->is_2pc()) {
                code = MetaServiceCode::TXN_ALREADY_VISIBLE;
                ss << "transaction [" << txn_id << "] is already visible, not pre-committed.";
                msg = ss.str();
                LOG(INFO) << msg;
                response->mutable_txn_info()->CopyFrom(txn_info);
                return;
            }
            code = MetaServiceCode::OK;
            ss << "transaction is already visible: db_id=" << db_id << " txn_id=" << txn_id;
            msg = ss.str();
            LOG(INFO) << msg;
            response->mutable_txn_info()->CopyFrom(txn_info);
            return;
        }

        if (request->has_is_2pc() && request->is_2pc() &&
            txn_info.status() == TxnStatusPB::TXN_STATUS_PREPARED) {
            code = MetaServiceCode::TXN_INVALID_STATUS;
            ss << "transaction is prepare, not pre-committed: db_id=" << db_id << " txn_id"
               << txn_id;
            msg = ss.str();
            LOG(WARNING) << msg;
            return;
        }

        auto now_time = system_clock::now();
        uint64_t commit_time = duration_cast<milliseconds>(now_time.time_since_epoch()).count();
        if ((txn_info.prepare_time() + txn_info.timeout_ms()) < commit_time) {
            code = MetaServiceCode::UNDEFINED_ERR;
            msg = fmt::format("txn is expired, not allow to commit txn_id={}", txn_id);
            LOG(INFO) << msg << " prepare_time=" << txn_info.prepare_time()
                      << " timeout_ms=" << txn_info.timeout_ms() << " commit_time=" << commit_time;
            return;
        }
        txn_info.set_commit_time(commit_time);
        txn_info.set_finish_time(commit_time);
        if (request->has_commit_attachment()) {
            txn_info.mutable_commit_attachment()->CopyFrom(request->commit_attachment());
        }
        DCHECK(txn_info.status() != TxnStatusPB::TXN_STATUS_COMMITTED);
        // set status TXN_STATUS_COMMITTED not TXN_STATUS_VISIBLE !!!
        // lazy commit task will advance txn to make txn visible
        txn_info.set_status(TxnStatusPB::TXN_STATUS_COMMITTED);

        txn_info.set_versioned_write(is_versioned_write);
        txn_info.set_versioned_read(is_versioned_read);

        LOG(INFO) << "after update txn_id= " << txn_id
                  << " txn_info=" << txn_info.ShortDebugString();
        info_val.clear();
        if (!txn_info.SerializeToString(&info_val)) {
            code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
            ss << "failed to serialize txn_info when saving, txn_id=" << txn_id;
            msg = ss.str();
            return;
        }

        txn->put(info_key, info_val);
        LOG(INFO) << "put info_key=" << hex(info_key) << " txn_id=" << txn_id;

        if (txn_info.load_job_source_type() ==
            LoadJobSourceTypePB::LOAD_JOB_SRC_TYPE_ROUTINE_LOAD_TASK) {
            put_routine_load_progress(code, msg, instance_id, request, txn.get(), db_id);
        }

        if (txn_info.load_job_source_type() ==
            LoadJobSourceTypePB::LOAD_JOB_SRC_TYPE_STREAMING_JOB) {
            update_streaming_job_meta(code, msg, instance_id, request, txn.get(), db_id);
            if (code != MetaServiceCode::OK) {
                LOG(WARNING) << "update_streaming_job_meta failed, txn_id=" << txn_id
                             << " code=" << code << " msg=" << msg;
                return;
            }
        }

        // save versions for partition
        int64_t version_update_time_ms =
                duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
        response->set_version_update_time_ms(version_update_time_ms);
        for (auto& [partition_id, version] : versions) {
            std::string ver_val;
            VersionPB version_pb;
            version_pb.add_pending_txn_ids(txn_id);
            version_pb.set_update_time_ms(version_update_time_ms);
            if (version > 1) {
                version_pb.set_version(version);
            }

            if (!version_pb.SerializeToString(&ver_val)) {
                code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
                ss << "failed to serialize version_pb when saving, txn_id=" << txn_id
                   << " partition_id=" << partition_id;
                msg = ss.str();
                return;
            }

            auto [db_id, table_id] = partition_indexes[partition_id];
            std::string version_key =
                    partition_version_key({instance_id, db_id, table_id, partition_id});

            txn->put(version_key, ver_val);
            LOG(INFO) << "put partition_version_key=" << hex(version_key) << " version:" << version
                      << " txn_id=" << txn_id << " partition_id=" << partition_id
                      << " update_time=" << version_update_time_ms;

            VLOG_DEBUG << "txn_id=" << txn_id << " table_id=" << table_id
                       << " partition_id=" << partition_id << " version=" << version;

            if (is_versioned_write) {
                std::string partition_version_key =
                        versioned::partition_version_key({instance_id, partition_id});
                versioned_put(txn.get(), partition_version_key, ver_val);
                LOG(INFO) << "put versioned partition key=" << hex(partition_version_key)
                          << ", txn_id=" << txn_id;
            }

            int64_t new_version = version + 1;
            commit_txn_log.mutable_partition_version_map()->insert({partition_id, new_version});

            response->add_table_ids(table_id);
            response->add_partition_ids(partition_id);
            response->add_versions(new_version);
        }

        // table_id -> tablets_ids
        std::unordered_map<int64_t, std::vector<int64_t>> table_id_tablet_ids;
        for (auto& [tablet_id, tablet_idx] : tablet_ids) {
            table_id_tablet_ids[tablet_idx.table_id()].push_back(tablet_id);
        }
        process_mow_when_commit_txn(request, instance_id, code, msg, txn, table_id_tablet_ids);
        if (code != MetaServiceCode::OK) {
            LOG(WARNING) << "process mow failed, txn_id=" << txn_id << " code=" << code;
            return;
        }

        // Save table versions
        for (auto& i : table_id_tablet_ids) {
            if (is_versioned_read) {
                // Read the table version, to build the operation log visible version range.
                err = meta_reader.get_table_version(txn.get(), i.first, nullptr, true);
                if (err != TxnErrorCode::TXN_OK && err != TxnErrorCode::TXN_KEY_NOT_FOUND) {
                    code = cast_as<ErrCategory::READ>(err);
                    ss << "failed to get table version, err=" << err << " table_id=" << i.first;
                    msg = ss.str();
                    LOG(WARNING) << msg;
                    return;
                }
            }
            update_table_version(txn.get(), instance_id, db_id, i.first);
            commit_txn_log.add_table_ids(i.first);
        }

        if (is_versioned_write) {
            RecycleTxnPB* recycle_txn = commit_txn_log.mutable_recycle_txn();
            recycle_txn->set_label(txn_info.label());
            std::string log_key = versioned::log_key({instance_id});
            OperationLogPB operation_log;
            if (is_versioned_read) {
                operation_log.set_min_timestamp(meta_reader.min_read_version());
            }
            operation_log.mutable_commit_txn()->Swap(&commit_txn_log);
            versioned::blob_put(txn.get(), log_key, operation_log);
            LOG(INFO) << "put commit txn operation log, key=" << hex(log_key)
                      << " txn_id=" << txn_id;
        }

        VLOG_DEBUG << "put_size=" << txn->put_bytes() << " del_size=" << txn->delete_bytes()
                   << " num_put_keys=" << txn->num_put_keys()
                   << " num_del_keys=" << txn->num_del_keys()
                   << " txn_size=" << txn->approximate_bytes() << " txn_id=" << txn_id;

        err = txn->commit();
        if (err != TxnErrorCode::TXN_OK) {
            if (err == TxnErrorCode::TXN_CONFLICT) {
                g_bvar_delete_bitmap_lock_txn_remove_conflict_by_load_counter << 1;
            }
            code = cast_as<ErrCategory::COMMIT>(err);
            ss << "failed to commit kv txn, txn_id=" << txn_id << " err=" << err;
            msg = ss.str();
            return;
        }

        TEST_SYNC_POINT_RETURN_WITH_VOID("commit_txn_eventually::txn_lazy_committer_submit",
                                         &txn_id);
        std::shared_ptr<TxnLazyCommitTask> task = txn_lazy_committer_->submit(instance_id, txn_id);
        TEST_SYNC_POINT_CALLBACK("commit_txn_eventually::txn_lazy_committer_wait", &txn_id);
        std::pair<MetaServiceCode, std::string> ret = task->wait();
        TEST_SYNC_POINT_CALLBACK("commit_txn_eventually::task->wait", &ret);
        if (ret.first != MetaServiceCode::OK) {
            LOG(WARNING) << "txn lazy commit failed txn_id=" << txn_id << " code=" << ret.first
                         << " msg=" << ret.second;
        }

        std::unordered_map<int64_t, TabletStats> tablet_stats; // tablet_id -> stats
        for (auto& [_, i] : tmp_rowsets_meta) {
            // Accumulate affected rows
            auto& stats = tablet_stats[i.tablet_id()];
            stats.data_size += i.total_disk_size();
            stats.num_rows += i.num_rows();
            ++stats.num_rowsets;
            stats.num_segs += i.num_segments();
            stats.index_size += i.index_disk_size();
            stats.segment_size += i.data_disk_size();
        }

        // calculate table stats from tablets stats
        std::map<int64_t /*table_id*/, TableStats> table_stats;
        std::vector<int64_t> base_tablet_ids(request->base_tablet_ids().begin(),
                                             request->base_tablet_ids().end());
        calc_table_stats(tablet_ids, tablet_stats, table_stats, base_tablet_ids);
        for (const auto& pair : table_stats) {
            TableStatsPB* stats_pb = response->add_table_stats();
            auto table_id = pair.first;
            auto stats = pair.second;
            get_pb_from_tablestats(stats, stats_pb);
            stats_pb->set_table_id(table_id);
            VLOG_DEBUG << "Add TableStats to CommitTxnResponse. txn_id=" << txn_id
                       << " table_id=" << table_id
                       << " updated_row_count=" << stats_pb->updated_row_count();
        }

        // txn set visible for fe callback
        txn_info.set_status(TxnStatusPB::TXN_STATUS_VISIBLE);
        response->mutable_txn_info()->CopyFrom(txn_info);
        TEST_SYNC_POINT_CALLBACK("commit_txn_eventually::finish", &code, &txn_id);
        break;
    } while (true);
}

/**
 * This process is generally the same as commit_txn, the difference is that
 * the partitions version will plus 1 in multi sub txns.
 *
 * One example:
 *  Suppose the table, partition, tablet and version info is:
 *  --------------------------------------------
 *  | table | partition | tablet    | version |
 *  --------------------------------------------
 *  | t1    | t1_p1     | t1_p1.1   | 1       |
 *  | t1    | t1_p1     | t1_p1.2   | 1       |
 *  | t1    | t1_p2     | t1_p2.1   | 2       |
 *  | t2    | t2_p3     | t2_p3.1   | 3       |
 *  | t2    | t2_p4     | t2_p4.1   | 4       |
 *  --------------------------------------------
 *
 *  Now we commit a txn with 3 sub txns and the tablets are:
 *    sub_txn1: t1_p1.1, t1_p1.2, t1_p2.1
 *    sub_txn2: t2_p3.1
 *    sub_txn3: t1_p1.1, t1_p1.2
 *  When commit, the partitions version will be:
 *    sub_txn1: t1_p1(1 -> 2), t1_p2(2 -> 3)
 *    sub_txn2: t2_p3(3 -> 4)
 *    sub_txn3: t1_p1(2 -> 3)
 *  After commit, the partitions version will be:
 *    t1: t1_p1(3), t1_p2(3)
 *    t2: t2_p3(4), t2_p4(4)
 */
void MetaServiceImpl::commit_txn_with_sub_txn(const CommitTxnRequest* request,
                                              CommitTxnResponse* response, MetaServiceCode& code,
                                              std::string& msg, const std::string& instance_id,
                                              int64_t db_id, KVStats& stats) {
    std::stringstream ss;
    int64_t txn_id = request->txn_id();
    auto sub_txn_infos = request->sub_txn_infos();
    std::map<int64_t, std::vector<std::pair<std::string, doris::RowsetMetaCloudPB>>>
            sub_txn_to_tmp_rowsets_meta;
    for (const auto& sub_txn_info : sub_txn_infos) {
        auto sub_txn_id = sub_txn_info.sub_txn_id();
        std::vector<std::pair<std::string, doris::RowsetMetaCloudPB>> tmp_rowsets_meta;
        scan_tmp_rowset(instance_id, sub_txn_id, txn_kv_, code, msg, &tmp_rowsets_meta, &stats);
        if (code != MetaServiceCode::OK) {
            LOG(WARNING) << "scan_tmp_rowset failed, txn_id=" << txn_id
                         << ", sub_txn_id=" << sub_txn_id << " code=" << code;
            return;
        }
        sub_txn_to_tmp_rowsets_meta.emplace(sub_txn_id, std::move(tmp_rowsets_meta));
    }
    do {
        TEST_SYNC_POINT_CALLBACK("commit_txn_with_sub_txn:begin", &txn_id);
        // Create a readonly txn for scan tmp rowset
        std::unique_ptr<Transaction> txn;
        TxnErrorCode err = txn_kv_->create_txn(&txn);
        if (err != TxnErrorCode::TXN_OK) {
            code = cast_as<ErrCategory::CREATE>(err);
            ss << "filed to create txn, txn_id=" << txn_id << " err=" << err;
            msg = ss.str();
            LOG(WARNING) << msg;
            return;
        }
        DORIS_CLOUD_DEFER {
            if (txn == nullptr) return;
            stats.get_bytes += txn->get_bytes();
            stats.put_bytes += txn->put_bytes();
            stats.del_bytes += txn->delete_bytes();
            stats.get_counter += txn->num_get_keys();
            stats.put_counter += txn->num_put_keys();
            stats.del_counter += txn->num_del_keys();
        };

        // Get txn info with db_id and txn_id
        std::string info_val; // Will be reused when saving updated txn
        const std::string info_key = txn_info_key({instance_id, db_id, txn_id});
        err = txn->get(info_key, &info_val);
        if (err != TxnErrorCode::TXN_OK) {
            code = err == TxnErrorCode::TXN_KEY_NOT_FOUND ? MetaServiceCode::TXN_ID_NOT_FOUND
                                                          : cast_as<ErrCategory::READ>(err);
            if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) {
                ss << "transaction [" << txn_id << "] not found, db_id=" << db_id;
            } else {
                ss << "failed to get txn_info, db_id=" << db_id << " txn_id=" << txn_id
                   << " err=" << err;
            }
            msg = ss.str();
            LOG(WARNING) << msg;
            return;
        }

        TxnInfoPB txn_info;
        if (!txn_info.ParseFromString(info_val)) {
            code = MetaServiceCode::PROTOBUF_PARSE_ERR;
            ss << "failed to parse txn_info, db_id=" << db_id << " txn_id=" << txn_id;
            msg = ss.str();
            LOG(WARNING) << msg;
            return;
        }

        // TODO: do more check like txn state
        DCHECK(txn_info.txn_id() == txn_id);
        if (txn_info.status() == TxnStatusPB::TXN_STATUS_ABORTED) {
            code = MetaServiceCode::TXN_ALREADY_ABORTED;
            ss << "transaction is already aborted: db_id=" << db_id << " txn_id=" << txn_id;
            msg = ss.str();
            LOG(WARNING) << msg;
            return;
        }

        if (txn_info.status() == TxnStatusPB::TXN_STATUS_VISIBLE) {
            code = MetaServiceCode::OK;
            ss << "transaction is already visible: db_id=" << db_id << " txn_id=" << txn_id;
            msg = ss.str();
            LOG(INFO) << msg;
            response->mutable_txn_info()->CopyFrom(txn_info);
            return;
        }

        LOG(INFO) << "txn_id=" << txn_id << " txn_info=" << txn_info.ShortDebugString();

        AnnotateTag txn_tag("txn_id", txn_id);

        bool is_versioned_write = is_version_write_enabled(instance_id);
        bool is_versioned_read = is_version_read_enabled(instance_id);
        CloneChainReader meta_reader(instance_id, resource_mgr_.get());

        // Prepare rowset meta and new_versions
        std::unordered_map<int64_t, TabletIndexPB> tablet_ids;
        std::vector<int64_t> acquired_tablet_ids;
        for (const auto& [_, tmp_rowsets_meta] : sub_txn_to_tmp_rowsets_meta) {
            for (const auto& [_, i] : tmp_rowsets_meta) {
                acquired_tablet_ids.push_back(i.tablet_id());
            }
        }
        if (!is_versioned_read) {
            // Read tablet indexes in batch.
            std::tie(code, msg) =
                    get_tablet_indexes(txn.get(), &tablet_ids, instance_id, acquired_tablet_ids);
            if (code != MetaServiceCode::OK) {
                return;
            }
        } else {
            TxnErrorCode err =
                    meta_reader.get_tablet_indexes(txn.get(), acquired_tablet_ids, &tablet_ids);
            if (err != TxnErrorCode::TXN_OK) {
                code = cast_as<ErrCategory::READ>(err);
                msg = fmt::format("failed to get tablet indexes, err={}", err);
                LOG_WARNING(msg);
                return;
            }
        }

        // {table/partition} -> version
        std::unordered_map<int64_t, int64_t> new_versions;
        std::unordered_map<int64_t, std::tuple<int64_t, int64_t>> partition_indexes;
        for (auto& [tablet_id, tablet_idx] : tablet_ids) {
            int64_t table_id = tablet_idx.table_id();
            int64_t partition_id = tablet_idx.partition_id();
            partition_indexes.insert({partition_id, {db_id, table_id}});
        }

        int64_t last_pending_txn_id = 0;
        if (!is_versioned_read) {
            std::tie(code, msg) = get_partition_versions(
                    txn.get(), &new_versions, &last_pending_txn_id, instance_id, partition_indexes);
            if (code != MetaServiceCode::OK) {
                return;
            }
        } else {
            std::vector<int64_t> partition_ids = to_container<std::vector<int64_t>>(
                    std::ranges::ref_view(partition_indexes) | std::ranges::views::keys);
            err = meta_reader.get_partition_versions(txn.get(), partition_ids, &new_versions,
                                                     &last_pending_txn_id);
            if (err != TxnErrorCode::TXN_OK) {
                code = cast_as<ErrCategory::READ>(err);
                msg = fmt::format("failed to get partition versions, err={}", err);
                LOG_WARNING(msg);
                return;
            }
        }

        if (last_pending_txn_id > 0) {
            stats.get_bytes += txn->get_bytes();
            stats.get_counter += txn->num_get_keys();
            txn.reset();
            TEST_SYNC_POINT_CALLBACK("commit_txn_with_sub_txn::advance_last_pending_txn_id",
                                     &last_pending_txn_id);
            std::shared_ptr<TxnLazyCommitTask> task =
                    txn_lazy_committer_->submit(instance_id, last_pending_txn_id);

            std::tie(code, msg) = task->wait();
            if (code != MetaServiceCode::OK) {
                LOG(WARNING) << "advance_last_txn failed last_txn=" << last_pending_txn_id
                             << " code=" << code << " msg=" << msg;
                return;
            }
            last_pending_txn_id = 0;
            continue;
        }

        CommitTxnLogPB commit_txn_log;
        commit_txn_log.set_txn_id(txn_id);
        commit_txn_log.set_db_id(db_id);

        int64_t rowsets_visible_ts_ms =
                duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();

        // <tablet_id, version> -> rowset meta
        std::vector<std::pair<std::tuple<int64_t, int64_t>, RowsetMetaCloudPB>> rowsets;
        std::unordered_map<int64_t, TabletStats> tablet_stats;    // tablet_id -> stats
        rowsets.reserve(sub_txn_to_tmp_rowsets_meta.size() * 10); // rough estimate
        for (const auto& sub_txn_info : sub_txn_infos) {
            auto sub_txn_id = sub_txn_info.sub_txn_id();
            auto tmp_rowsets_meta = sub_txn_to_tmp_rowsets_meta[sub_txn_id];
            std::unordered_map<int64_t, int64_t> partition_id_to_version;
            for (auto& [_, i] : tmp_rowsets_meta) {
                int64_t tablet_id = i.tablet_id();
                int64_t table_id = tablet_ids[tablet_id].table_id();
                int64_t partition_id = i.partition_id();
                if (new_versions.count(partition_id) == 0) [[unlikely]] {
                    // it is impossible.
                    code = MetaServiceCode::UNDEFINED_ERR;
                    ss << "failed to get partition version key, the target version not exists in "
                          "new_versions."
                       << " txn_id=" << txn_id << ", db_id=" << db_id << ", table_id=" << table_id
                       << ", partition_id=" << partition_id;
                    msg = ss.str();
                    LOG(ERROR) << msg;
                    return;
                }

                // Update rowset version
                int64_t new_version = new_versions[partition_id];
                if (partition_id_to_version.count(partition_id) == 0) {
                    new_versions[partition_id] = new_version + 1;
                    new_version = new_versions[partition_id];
                    partition_id_to_version[partition_id] = new_version;
                }
                i.set_start_version(new_version);
                i.set_end_version(new_version);
                i.set_visible_ts_ms(rowsets_visible_ts_ms);
                LOG(INFO) << "xxx update rowset version, txn_id=" << txn_id
                          << ", sub_txn_id=" << sub_txn_id << ", table_id=" << table_id
                          << ", partition_id=" << partition_id << ", tablet_id=" << tablet_id
                          << ", new_version=" << new_version;

                // Accumulate affected rows
                auto& stats = tablet_stats[tablet_id];
                stats.data_size += i.total_disk_size();
                stats.num_rows += i.num_rows();
                ++stats.num_rowsets;
                stats.num_segs += i.num_segments();
                stats.index_size += i.index_disk_size();
                stats.segment_size += i.data_disk_size();

                rowsets.emplace_back(std::make_tuple(tablet_id, i.end_version()), std::move(i));
                commit_txn_log.mutable_tablet_to_partition_map()->insert({tablet_id, partition_id});
            } // for tmp_rowsets_meta
        }

        std::unordered_map<int64_t, std::vector<int64_t>> table_id_tablet_ids;
        for (auto& [tablet_id, tablet_idx] : tablet_ids) {
            int64_t table_id = tablet_idx.table_id();
            table_id_tablet_ids[table_id].push_back(tablet_id);
        }
        process_mow_when_commit_txn(request, instance_id, code, msg, txn, table_id_tablet_ids);
        if (code != MetaServiceCode::OK) {
            LOG(WARNING) << "process mow failed, txn_id=" << txn_id << " code=" << code;
            return;
        }

        // Save rowset meta
        for (auto& i : rowsets) {
            auto [tablet_id, version] = i.first;
            std::string rowset_key = meta_rowset_key({instance_id, tablet_id, version});
            std::string val;
            if (!i.second.SerializeToString(&val)) {
                code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
                ss << "failed to serialize rowset_meta, txn_id=" << txn_id;
                msg = ss.str();
                return;
            }
            size_t rowset_size = rowset_key.size() + val.size();
            txn->put(rowset_key, val);
            LOG(INFO) << "put rowset_key=" << hex(rowset_key) << " txn_id=" << txn_id
                      << " rowset_size=" << rowset_size;

            if (is_versioned_write) {
                std::string versioned_rowset_key =
                        versioned::meta_rowset_load_key({instance_id, tablet_id, version});
                if (!versioned::document_put(txn.get(), versioned_rowset_key,
                                             std::move(i.second))) {
                    code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
                    ss << "failed to put versioned rowset meta, txn_id=" << txn_id
                       << " key=" << hex(versioned_rowset_key);
                    msg = ss.str();
                    LOG(WARNING) << msg;
                    return;
                }
                LOG(INFO) << "put versioned rowset meta key=" << hex(versioned_rowset_key)
                          << ", txn_id=" << txn_id;
            }
        }

        // Save versions
        for (auto& [partition_id, new_version] : new_versions) {
            std::string ver_val;
            VersionPB version_pb;
            version_pb.set_version(new_version);
            if (!version_pb.SerializeToString(&ver_val)) {
                code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
                ss << "failed to serialize version_pb when saving, txn_id=" << txn_id;
                msg = ss.str();
                return;
            }

            auto [db_id, table_id] = partition_indexes[partition_id];
            std::string version_key =
                    partition_version_key({instance_id, db_id, table_id, partition_id});
            txn->put(version_key, ver_val);
            LOG(INFO) << "put partition_version_key=" << hex(version_key)
                      << " version:" << new_version << " txn_id=" << txn_id
                      << " partition_id=" << partition_id;

            VLOG_DEBUG << "txn_id=" << txn_id << " table_id=" << table_id
                       << " partition_id=" << partition_id << " version=" << new_version;

            if (is_versioned_write) {
                std::string partition_version_key =
                        versioned::partition_version_key({instance_id, partition_id});
                versioned_put(txn.get(), partition_version_key, ver_val);
                LOG(INFO) << "put versioned partition key=" << hex(partition_version_key)
                          << ", txn_id=" << txn_id;
            }
            commit_txn_log.mutable_partition_version_map()->insert({partition_id, new_version});

            response->add_table_ids(table_id);
            response->add_partition_ids(partition_id);
            response->add_versions(new_version);
        }

        // Save table versions
        for (auto& i : table_id_tablet_ids) {
            if (is_versioned_read) {
                // Read the table version, to build the operation log visible version range.
                TxnErrorCode err = meta_reader.get_table_version(txn.get(), i.first, nullptr, true);
                if (err != TxnErrorCode::TXN_OK && err != TxnErrorCode::TXN_KEY_NOT_FOUND) {
                    code = cast_as<ErrCategory::READ>(err);
                    ss << "failed to get table version, err=" << err << " table_id=" << i.first;
                    msg = ss.str();
                    LOG(WARNING) << msg;
                    return;
                }
            }
            update_table_version(txn.get(), instance_id, db_id, i.first);
            commit_txn_log.add_table_ids(i.first);
        }

        LOG(INFO) << " before update txn_info=" << txn_info.ShortDebugString();

        // Update txn_info
        txn_info.set_status(TxnStatusPB::TXN_STATUS_VISIBLE);

        auto now_time = system_clock::now();
        uint64_t commit_time = duration_cast<milliseconds>(now_time.time_since_epoch()).count();
        if ((txn_info.prepare_time() + txn_info.timeout_ms()) < commit_time) {
            code = MetaServiceCode::UNDEFINED_ERR;
            msg = fmt::format("txn is expired, not allow to commit txn_id={}", txn_id);
            LOG(INFO) << msg << " prepare_time=" << txn_info.prepare_time()
                      << " timeout_ms=" << txn_info.timeout_ms() << " commit_time=" << commit_time;
            return;
        }
        txn_info.set_commit_time(commit_time);
        txn_info.set_finish_time(commit_time);
        if (request->has_commit_attachment()) {
            txn_info.mutable_commit_attachment()->CopyFrom(request->commit_attachment());
        }
        txn_info.set_versioned_write(is_versioned_write);
        txn_info.set_versioned_read(is_versioned_read);

        LOG(INFO) << "after update txn_info=" << txn_info.ShortDebugString();
        info_val.clear();
        if (!txn_info.SerializeToString(&info_val)) {
            code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
            ss << "failed to serialize txn_info when saving, txn_id=" << txn_id;
            msg = ss.str();
            return;
        }
        txn->put(info_key, info_val);
        LOG(INFO) << "xxx put info_key=" << hex(info_key) << " txn_id=" << txn_id;

        // Batch get existing versioned tablet stats if needed
        std::unordered_map<int64_t, TabletStatsPB> existing_versioned_stats;
        if (is_versioned_write && !tablet_stats.empty()) {
            internal_get_load_tablet_stats_batch(code, msg, meta_reader, txn.get(), instance_id,
                                                 tablet_ids, &existing_versioned_stats);
            if (code != MetaServiceCode::OK) {
                LOG(WARNING) << "batch get versioned tablet stats failed, code=" << code
                             << " msg=" << msg << " txn_id=" << txn_id;
                return;
            }
            LOG(INFO) << "batch get " << existing_versioned_stats.size()
                      << " versioned tablet stats, txn_id=" << txn_id;
        }

        // Update stats of affected tablet
        for (auto& [tablet_id, stats] : tablet_stats) {
            DCHECK(tablet_ids.count(tablet_id));
            auto& tablet_idx = tablet_ids[tablet_id];
            StatsTabletKeyInfo info {instance_id, tablet_idx.table_id(), tablet_idx.index_id(),
                                     tablet_idx.partition_id(), tablet_id};
            update_tablet_stats(info, stats, txn, code, msg);
            if (code != MetaServiceCode::OK) return;

            if (is_versioned_write) {
                TabletStatsPB stats_pb = existing_versioned_stats[tablet_id];
                merge_tablet_stats(stats_pb, stats);
                std::string stats_key = versioned::tablet_load_stats_key({instance_id, tablet_id});
                if (!versioned::document_put(txn.get(), stats_key, std::move(stats_pb))) {
                    code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
                    msg = "failed to serialize versioned tablet stats";
                    LOG(WARNING) << msg << " tablet_id=" << tablet_id << " txn_id=" << txn_id;
                    return;
                }
                LOG(INFO) << "put versioned tablet stats key=" << hex(stats_key)
                          << " tablet_id=" << tablet_id << " txn_id=" << txn_id;
            }
        }
        // Remove tmp rowset meta
        for (auto& [_, tmp_rowsets_meta] : sub_txn_to_tmp_rowsets_meta) {
            for (auto& [k, _] : tmp_rowsets_meta) {
                txn->remove(k);
                LOG(INFO) << "xxx remove tmp_rowset_key=" << hex(k) << " txn_id=" << txn_id;
            }
        }

        const std::string running_key = txn_running_key({instance_id, db_id, txn_id});
        LOG(INFO) << "xxx remove running_key=" << hex(running_key) << " txn_id=" << txn_id;
        txn->remove(running_key);

        std::string recycle_val;
        std::string recycle_key = recycle_txn_key({instance_id, db_id, txn_id});
        RecycleTxnPB recycle_pb;
        recycle_pb.set_creation_time(commit_time);
        recycle_pb.set_label(txn_info.label());

        if (is_versioned_write) {
            commit_txn_log.mutable_recycle_txn()->Swap(&recycle_pb);
            std::string log_key = versioned::log_key({instance_id});
            OperationLogPB operation_log;
            if (is_versioned_read) {
                operation_log.set_min_timestamp(meta_reader.min_read_version());
            }
            operation_log.mutable_commit_txn()->Swap(&commit_txn_log);
            versioned::blob_put(txn.get(), log_key, operation_log);
            LOG(INFO) << "put commit txn operation log key=" << hex(recycle_key)
                      << " txn_id=" << txn_id;
        } else {
            if (!recycle_pb.SerializeToString(&recycle_val)) {
                code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
                ss << "failed to serialize recycle_pb, txn_id=" << txn_id;
                msg = ss.str();
                return;
            }
            txn->put(recycle_key, recycle_val);
            LOG(INFO) << "commit_txn put recycle_txn_key key=" << hex(recycle_key)
                      << " txn_id=" << txn_id;
        }

        LOG(INFO) << "commit_txn put_size=" << txn->put_bytes()
                  << " del_size=" << txn->delete_bytes() << " num_put_keys=" << txn->num_put_keys()
                  << " num_del_keys=" << txn->num_del_keys()
                  << " txn_size=" << txn->approximate_bytes() << " txn_id=" << txn_id;

        TEST_SYNC_POINT_RETURN_WITH_VOID("commit_txn_with_sub_txn::before_commit", &err, &code);
        err = txn->commit();
        if (err != TxnErrorCode::TXN_OK) {
            if (err == TxnErrorCode::TXN_CONFLICT) {
                g_bvar_delete_bitmap_lock_txn_remove_conflict_by_load_counter << 1;
            }
            code = cast_as<ErrCategory::COMMIT>(err);
            ss << "failed to commit kv txn with sub txn, txn_id=" << txn_id << " err=" << err;
            msg = ss.str();
            return;
        }

        // calculate table stats from tablets stats
        std::map<int64_t /*table_id*/, TableStats> table_stats;
        std::vector<int64_t> base_tablet_ids(request->base_tablet_ids().begin(),
                                             request->base_tablet_ids().end());
        calc_table_stats(tablet_ids, tablet_stats, table_stats, base_tablet_ids);
        for (const auto& pair : table_stats) {
            TableStatsPB* stats_pb = response->add_table_stats();
            auto table_id = pair.first;
            auto stats = pair.second;
            get_pb_from_tablestats(stats, stats_pb);
            stats_pb->set_table_id(table_id);
            VLOG_DEBUG << "Add TableStats to CommitTxnResponse. txn_id=" << txn_id
                       << " table_id=" << table_id
                       << " updated_row_count=" << stats_pb->updated_row_count();
        }

        response->mutable_txn_info()->CopyFrom(txn_info);
        TEST_SYNC_POINT_CALLBACK("commit_txn_with_sub_txn::finish", &code);
        break;
    } while (true);
} // end commit_txn_with_sub_txn

static bool force_txn_lazy_commit() {
    static std::mt19937 rng(20250806 /* seed */);
    static std::uniform_int_distribution<int> dist(1, 100);
    return dist(rng) <= config::cloud_txn_lazy_commit_fuzzy_possibility;
}

void MetaServiceImpl::commit_txn(::google::protobuf::RpcController* controller,
                                 const CommitTxnRequest* request, CommitTxnResponse* response,
                                 ::google::protobuf::Closure* done) {
    RPC_PREPROCESS(commit_txn, get, put, del);
    if (!request->has_txn_id()) {
        code = MetaServiceCode::INVALID_ARGUMENT;
        msg = "invalid argument, missing txn id";
        return;
    }

    int64_t txn_id = request->txn_id();
    std::string cloud_unique_id = request->has_cloud_unique_id() ? request->cloud_unique_id() : "";
    instance_id = get_instance_id(resource_mgr_, cloud_unique_id);
    if (instance_id.empty()) {
        code = MetaServiceCode::INVALID_ARGUMENT;
        msg = "empty instance_id";
        LOG(INFO) << msg << ", cloud_unique_id=" << cloud_unique_id << " txn_id=" << txn_id;
        return;
    }
    RPC_RATE_LIMIT(commit_txn)

    int64_t db_id;
    get_txn_db_id(txn_kv_.get(), instance_id, txn_id, code, msg, &db_id, &stats);
    if (code != MetaServiceCode::OK) {
        LOG(WARNING) << "get_txn_db_id failed, txn_id=" << txn_id << " code=" << code;
        return;
    }

    if (request->has_is_txn_load() && request->is_txn_load()) {
        commit_txn_with_sub_txn(request, response, code, msg, instance_id, db_id, stats);
        return;
    }

    std::vector<std::pair<std::string, doris::RowsetMetaCloudPB>> tmp_rowsets_meta;
    scan_tmp_rowset(instance_id, txn_id, txn_kv_, code, msg, &tmp_rowsets_meta, &stats);
    if (code != MetaServiceCode::OK) {
        LOG(WARNING) << "scan_tmp_rowset failed, txn_id=" << txn_id << " code=" << code;
        return;
    }

    TxnErrorCode err = TxnErrorCode::TXN_OK;
    bool enable_txn_lazy_commit_feature =
            (request->has_is_2pc() && !request->is_2pc() && request->has_enable_txn_lazy_commit() &&
             request->enable_txn_lazy_commit() && config::enable_cloud_txn_lazy_commit);

    while ((!enable_txn_lazy_commit_feature ||
            (tmp_rowsets_meta.size() <= config::txn_lazy_commit_rowsets_thresold))) {
        if (force_txn_lazy_commit()) {
            LOG(INFO) << "fuzzy test force_txn_lazy_commit, txn_id=" << txn_id
                      << " force_posibility=" << config::cloud_txn_lazy_commit_fuzzy_possibility;
            break;
        }

        commit_txn_immediately(request, response, code, msg, instance_id, db_id, tmp_rowsets_meta,
                               err, stats);

        if (MetaServiceCode::OK == code) {
            return;
        }

        if (TxnErrorCode::TXN_BYTES_TOO_LARGE != err) {
            return;
        }

        if (!enable_txn_lazy_commit_feature) {
            if (err == TxnErrorCode::TXN_BYTES_TOO_LARGE) {
                msg += ", likely due to committing too many tablets. "
                       "Please reduce the number of partitions involved in the load.";
            }
            return;
        }

        DCHECK(code != MetaServiceCode::OK);
        DCHECK(enable_txn_lazy_commit_feature);
        DCHECK(err == TxnErrorCode::TXN_BYTES_TOO_LARGE);
        LOG(INFO) << "txn_id=" << txn_id << " fallthrough commit_txn_eventually";
        break;
    }

    LOG(INFO) << "txn_id=" << txn_id << " commit_txn_eventually"
              << " tmp_rowsets_meta.size=" << tmp_rowsets_meta.size();
    code = MetaServiceCode::OK;
    msg.clear();
    commit_txn_eventually(request, response, code, msg, instance_id, db_id, tmp_rowsets_meta,
                          stats);
}

static void _abort_txn(const std::string& instance_id, const AbortTxnRequest* request,
                       Transaction* txn, TxnInfoPB& return_txn_info, std::stringstream& ss,
                       MetaServiceCode& code, std::string& msg) {
    int64_t txn_id = request->txn_id();
    std::string label = request->label();
    int64_t db_id = request->db_id();

    std::string info_key; // Will be used when saving updated txn
    std::string info_val; // Will be reused when saving updated txn
    TxnErrorCode err;
    //TODO: split with two function.
    //there two ways to abort txn:
    //1. abort txn by txn id
    //2. abort txn by label and db_id
    if (txn_id > 0) {
        VLOG_DEBUG << "abort_txn by txn_id, txn_id=" << txn_id;
        //abort txn by txn id
        // Get db id with txn id

        std::string index_key;
        std::string index_val;
        //not provide db_id, we need read from disk.
        if (db_id == 0) {
            index_key = txn_index_key({instance_id, txn_id});
            err = txn->get(index_key, &index_val);
            if (err != TxnErrorCode::TXN_OK) {
                code = err == TxnErrorCode::TXN_KEY_NOT_FOUND ? MetaServiceCode::TXN_ID_NOT_FOUND
                                                              : cast_as<ErrCategory::READ>(err);
                if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) {
                    ss << "transaction [" << txn_id << "] not found";
                } else {
                    ss << "failed to get txn info, txn_id=" << txn_id << " err=" << err;
                }
                msg = ss.str();
                return;
            }

            TxnIndexPB index_pb;
            if (!index_pb.ParseFromString(index_val)) {
                code = MetaServiceCode::PROTOBUF_PARSE_ERR;
                ss << "failed to parse txn_index_val"
                   << " txn_id=" << txn_id;
                msg = ss.str();
                return;
            }
            DCHECK(index_pb.has_tablet_index() == true);
            DCHECK(index_pb.tablet_index().has_db_id() == true);
            db_id = index_pb.tablet_index().db_id();
        }

        // Get txn info with db_id and txn_id
        info_key = txn_info_key({instance_id, db_id, txn_id});
        err = txn->get(info_key, &info_val);
        if (err != TxnErrorCode::TXN_OK) {
            code = err == TxnErrorCode::TXN_KEY_NOT_FOUND ? MetaServiceCode::TXN_ID_NOT_FOUND
                                                          : cast_as<ErrCategory::READ>(err);
            ss << "failed to get txn_info, db_id=" << db_id << "txn_id=" << txn_id << "err=" << err;
            msg = ss.str();
            return;
        }

        if (!return_txn_info.ParseFromString(info_val)) {
            code = MetaServiceCode::PROTOBUF_PARSE_ERR;
            ss << "failed to parse txn_info db_id=" << db_id << "txn_id=" << txn_id;
            msg = ss.str();
            return;
        }

        DCHECK(return_txn_info.txn_id() == txn_id);

        //check state is valid.
        if (return_txn_info.status() == TxnStatusPB::TXN_STATUS_ABORTED) {
            code = MetaServiceCode::TXN_ALREADY_ABORTED;
            ss << "transaction [" << txn_id << "] is already aborted, db_id=" << db_id;
            msg = ss.str();
            return;
        }
        if (return_txn_info.status() == TxnStatusPB::TXN_STATUS_VISIBLE) {
            code = MetaServiceCode::TXN_ALREADY_VISIBLE;
            ss << "transaction [" << txn_id << "] is already VISIBLE, db_id=" << db_id;
            msg = ss.str();
            return;
        }
    } else {
        VLOG_DEBUG << "abort_txn db_id and label, db_id=" << db_id << " label=" << label;
        //abort txn by label.
        std::string label_key = txn_label_key({instance_id, db_id, label});
        std::string label_val;
        err = txn->get(label_key, &label_val);
        if (err != TxnErrorCode::TXN_OK) {
            code = cast_as<ErrCategory::READ>(err);
            ss << "txn->get() failed, label=" << label << " err=" << err;
            msg = ss.str();
            return;
        }

        //label index not exist
        if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) {
            code = MetaServiceCode::TXN_LABEL_NOT_FOUND;
            ss << "label not found, db_id=" << db_id << " label=" << label << " err=" << err;
            msg = ss.str();
            return;
        }

        TxnLabelPB label_pb;
        DCHECK(label_val.size() > VERSION_STAMP_LEN);
        if (!label_pb.ParseFromArray(label_val.data(), label_val.size() - VERSION_STAMP_LEN)) {
            code = MetaServiceCode::PROTOBUF_PARSE_ERR;
            ss << "txn_label_pb->ParseFromString() failed, label=" << label;
            msg = ss.str();
            return;
        }

        int64_t prepare_txn_id = 0;
        //found prepare state txn for abort
        for (auto& cur_txn_id : label_pb.txn_ids()) {
            std::string cur_info_key = txn_info_key({instance_id, db_id, cur_txn_id});
            std::string cur_info_val;
            err = txn->get(cur_info_key, &cur_info_val);
            if (err != TxnErrorCode::TXN_OK) {
                code = cast_as<ErrCategory::READ>(err);
                std::stringstream ss;
                ss << "txn->get() failed, cur_txn_id=" << cur_txn_id << " err=" << err;
                msg = ss.str();
                return;
            }
            // ret == 0
            TxnInfoPB cur_txn_info;
            if (!cur_txn_info.ParseFromString(cur_info_val)) {
                code = MetaServiceCode::PROTOBUF_PARSE_ERR;
                std::stringstream ss;
                ss << "cur_txn_info->ParseFromString() failed, cur_txn_id=" << cur_txn_id;
                msg = ss.str();
                return;
            }
            VLOG_DEBUG << "cur_txn_info=" << cur_txn_info.ShortDebugString();
            //TODO: 2pc else need to check TxnStatusPB::TXN_STATUS_PRECOMMITTED
            if ((cur_txn_info.status() == TxnStatusPB::TXN_STATUS_PREPARED) ||
                (cur_txn_info.status() == TxnStatusPB::TXN_STATUS_PRECOMMITTED)) {
                prepare_txn_id = cur_txn_id;
                return_txn_info = std::move(cur_txn_info);
                info_key = std::move(cur_info_key);
                DCHECK_EQ(prepare_txn_id, return_txn_info.txn_id())
                        << "prepare_txn_id=" << prepare_txn_id
                        << " txn_id=" << return_txn_info.txn_id();
                break;
            }
        }

        if (prepare_txn_id == 0) {
            code = MetaServiceCode::TXN_INVALID_STATUS;
            std::stringstream ss;
            ss << "running transaction not found, db_id=" << db_id << " label=" << label;
            msg = ss.str();
            return;
        }
    }

    auto now_time = system_clock::now();
    uint64_t finish_time = duration_cast<milliseconds>(now_time.time_since_epoch()).count();

    // Update txn_info
    return_txn_info.set_status(TxnStatusPB::TXN_STATUS_ABORTED);
    return_txn_info.set_finish_time(finish_time);
    request->has_reason() ? return_txn_info.set_reason(request->reason())
                          : return_txn_info.set_reason("User Abort");

    if (request->has_commit_attachment()) {
        return_txn_info.mutable_commit_attachment()->CopyFrom(request->commit_attachment());
    }

    info_val.clear();
    if (!return_txn_info.SerializeToString(&info_val)) {
        code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
        ss << "failed to serialize txn_info when saving, txn_id=" << return_txn_info.txn_id();
        msg = ss.str();
        return;
    }
    LOG(INFO) << "check watermark conflict, txn_info=" << return_txn_info.ShortDebugString();
    txn->put(info_key, info_val);
    LOG(INFO) << "xxx put info_key=" << hex(info_key) << " txn_id=" << return_txn_info.txn_id();

    std::string running_key = txn_running_key({instance_id, db_id, return_txn_info.txn_id()});
    txn->remove(running_key);
    LOG(INFO) << "xxx remove running_key=" << hex(running_key)
              << " txn_id=" << return_txn_info.txn_id();

    std::string recycle_key = recycle_txn_key({instance_id, db_id, return_txn_info.txn_id()});
    std::string recycle_val;
    RecycleTxnPB recycle_pb;
    recycle_pb.set_creation_time(finish_time);
    recycle_pb.set_label(return_txn_info.label());

    if (!recycle_pb.SerializeToString(&recycle_val)) {
        code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
        ss << "failed to serialize recycle_pb, txn_id=" << return_txn_info.txn_id();
        msg = ss.str();
        return;
    }
    txn->put(recycle_key, recycle_val);
    LOG(INFO) << "put recycle_txn_key=" << hex(recycle_key)
              << " txn_id=" << return_txn_info.txn_id();
}

void MetaServiceImpl::abort_txn(::google::protobuf::RpcController* controller,
                                const AbortTxnRequest* request, AbortTxnResponse* response,
                                ::google::protobuf::Closure* done) {
    RPC_PREPROCESS(abort_txn, get, put, del);
    // Get txn id
    int64_t txn_id = request->has_txn_id() ? request->txn_id() : -1;
    std::string label = request->has_label() ? request->label() : "";
    int64_t db_id = request->has_db_id() ? request->db_id() : -1;
    if (txn_id < 0 && (label.empty() || db_id < 0)) {
        code = MetaServiceCode::INVALID_ARGUMENT;
        ss << "invalid txn id and label, db_id=" << db_id << " txn_id=" << txn_id
           << " label=" << label;
        msg = ss.str();
        return;
    }

    std::string cloud_unique_id = request->has_cloud_unique_id() ? request->cloud_unique_id() : "";
    instance_id = get_instance_id(resource_mgr_, request->cloud_unique_id());
    if (instance_id.empty()) {
        code = MetaServiceCode::INVALID_ARGUMENT;
        ss << "cannot find instance_id with cloud_unique_id="
           << (cloud_unique_id.empty() ? "(empty)" : cloud_unique_id) << " label=" << label
           << " txn_id=" << txn_id;
        msg = ss.str();
        return;
    }

    RPC_RATE_LIMIT(abort_txn);
    TxnErrorCode err = txn_kv_->create_txn(&txn);
    if (err != TxnErrorCode::TXN_OK) {
        code = cast_as<ErrCategory::CREATE>(err);
        ss << "filed to txn_kv_->create_txn(), txn_id=" << txn_id << " label=" << label
           << " err=" << err;
        msg = ss.str();
        return;
    }
    TxnInfoPB txn_info;

    _abort_txn(instance_id, request, txn.get(), txn_info, ss, code, msg);
    err = txn->commit();
    if (err != TxnErrorCode::TXN_OK) {
        code = cast_as<ErrCategory::COMMIT>(err);
        ss << "failed to commit kv txn, txn_id=" << txn_info.txn_id() << " err=" << err;
        msg = ss.str();
        return;
    }
    response->mutable_txn_info()->CopyFrom(txn_info);
}

void MetaServiceImpl::get_txn(::google::protobuf::RpcController* controller,
                              const GetTxnRequest* request, GetTxnResponse* response,
                              ::google::protobuf::Closure* done) {
    RPC_PREPROCESS(get_txn, get);
    int64_t txn_id = request->has_txn_id() ? request->txn_id() : -1;
    int64_t db_id = request->has_db_id() ? request->db_id() : -1;
    std::string label = request->has_label() ? request->label() : "";
    if (txn_id < 0 && label.empty()) {
        code = MetaServiceCode::INVALID_ARGUMENT;
        ss << "invalid txn_id, it may be not given or set properly, txn_id=" << txn_id;
        msg = ss.str();
        return;
    }

    std::string cloud_unique_id = request->has_cloud_unique_id() ? request->cloud_unique_id() : "";
    instance_id = get_instance_id(resource_mgr_, cloud_unique_id);
    if (instance_id.empty()) {
        code = MetaServiceCode::INVALID_ARGUMENT;
        ss << "cannot find instance_id with cloud_unique_id="
           << (cloud_unique_id.empty() ? "(empty)" : cloud_unique_id);
        msg = ss.str();
        return;
    }

    RPC_RATE_LIMIT(get_txn)
    TxnErrorCode err = txn_kv_->create_txn(&txn);
    if (err != TxnErrorCode::TXN_OK) {
        code = cast_as<ErrCategory::CREATE>(err);
        ss << "failed to create txn, txn_id=" << txn_id << " err=" << err;
        msg = ss.str();
        return;
    }

    if (!label.empty()) {
        //step 1: get label
        const std::string label_key = txn_label_key({instance_id, db_id, label});
        std::string label_val;
        err = txn->get(label_key, &label_val);
        if (err != TxnErrorCode::TXN_OK && err != TxnErrorCode::TXN_KEY_NOT_FOUND) {
            code = cast_as<ErrCategory::READ>(err);
            ss << "txn->get failed(), err=" << err << " label=" << label;
            msg = ss.str();
            return;
        }
        LOG(INFO) << "txn->get label_key=" << hex(label_key) << " label=" << label
                  << " err=" << err;
        // step 2: get txn info from label pb
        TxnLabelPB label_pb;
        if (err == TxnErrorCode::TXN_OK) {
            if (label_val.size() <= VERSION_STAMP_LEN ||
                !label_pb.ParseFromArray(label_val.data(), label_val.size() - VERSION_STAMP_LEN)) {
                code = MetaServiceCode::PROTOBUF_PARSE_ERR;
                ss << "label_pb->ParseFromString() failed, txn_id=" << txn_id << " label=" << label;
                msg = ss.str();
                return;
            }
            for (auto& cur_txn_id : label_pb.txn_ids()) {
                if (cur_txn_id > txn_id) {
                    txn_id = cur_txn_id;
                }
            }
        } else {
            code = MetaServiceCode::TXN_ID_NOT_FOUND;
            ss << "Label [" << label << "] has not found";
            msg = ss.str();
            return;
        }
    }

    //not provide db_id, we need read from disk.
    if (db_id < 0) {
        const std::string index_key = txn_index_key({instance_id, txn_id});
        std::string index_val;
        err = txn->get(index_key, &index_val);
        if (err != TxnErrorCode::TXN_OK) {
            code = err == TxnErrorCode::TXN_KEY_NOT_FOUND ? MetaServiceCode::TXN_ID_NOT_FOUND
                                                          : cast_as<ErrCategory::READ>(err);
            ss << "failed to get db id with txn_id=" << txn_id << " err=" << err;
            msg = ss.str();
            return;
        }

        TxnIndexPB index_pb;
        if (!index_pb.ParseFromString(index_val)) {
            code = MetaServiceCode::PROTOBUF_PARSE_ERR;
            ss << "failed to parse txn_inf"
               << " txn_id=" << txn_id;
            msg = ss.str();
            return;
        }
        DCHECK(index_pb.has_tablet_index() == true);
        DCHECK(index_pb.tablet_index().has_db_id() == true);
        db_id = index_pb.tablet_index().db_id();
        if (db_id <= 0) {
            ss << "internal error: unexpected db_id " << db_id;
            code = MetaServiceCode::UNDEFINED_ERR;
            msg = ss.str();
            return;
        }
    }

    // Get txn info with db_id and txn_id
    const std::string info_key = txn_info_key({instance_id, db_id, txn_id});
    std::string info_val;
    err = txn->get(info_key, &info_val);
    if (err != TxnErrorCode::TXN_OK) {
        code = err == TxnErrorCode::TXN_KEY_NOT_FOUND ? MetaServiceCode::TXN_ID_NOT_FOUND
                                                      : cast_as<ErrCategory::READ>(err);
        ss << "failed to get db id with db_id=" << db_id << " txn_id=" << txn_id << " err=" << err;
        msg = ss.str();
        return;
    }

    TxnInfoPB txn_info;
    if (!txn_info.ParseFromString(info_val)) {
        code = MetaServiceCode::PROTOBUF_PARSE_ERR;
        ss << "failed to parse txn_info db_id=" << db_id << " txn_id=" << txn_id;
        msg = ss.str();
        return;
    }

    VLOG_DEBUG << "txn_info=" << txn_info.ShortDebugString();
    DCHECK(txn_info.txn_id() == txn_id);
    response->mutable_txn_info()->CopyFrom(txn_info);
}

//To get current max txn id for schema change watermark etc.
void MetaServiceImpl::get_current_max_txn_id(::google::protobuf::RpcController* controller,
                                             const GetCurrentMaxTxnRequest* request,
                                             GetCurrentMaxTxnResponse* response,
                                             ::google::protobuf::Closure* done) {
    RPC_PREPROCESS(get_current_max_txn_id, get);
    // TODO: For auth
    instance_id = get_instance_id(resource_mgr_, request->cloud_unique_id());
    if (instance_id.empty()) {
        code = MetaServiceCode::INVALID_ARGUMENT;
        msg = "empty instance_id";
        LOG(INFO) << msg << ", cloud_unique_id=" << request->cloud_unique_id();
        return;
    }
    RPC_RATE_LIMIT(get_current_max_txn_id)

    TxnErrorCode err = txn_kv_->create_txn(&txn);
    if (err != TxnErrorCode::TXN_OK) {
        msg = "failed to create txn";
        code = cast_as<ErrCategory::CREATE>(err);
        return;
    }

    const std::string key = "schema change";
    std::string val;
    err = txn->get(key, &val);
    if (err != TxnErrorCode::TXN_OK && err != TxnErrorCode::TXN_KEY_NOT_FOUND) {
        code = cast_as<ErrCategory::READ>(err);
        std::stringstream ss;
        ss << "txn->get() failed, err=" << err;
        msg = ss.str();
        return;
    }
    int64_t read_version = 0;
    err = txn->get_read_version(&read_version);
    if (err != TxnErrorCode::TXN_OK) {
        code = cast_as<ErrCategory::READ>(err);
        std::stringstream ss;
        ss << "get read version failed, ret=" << err;
        msg = ss.str();
        return;
    }

    int64_t current_max_txn_id = read_version << 10;
    VLOG_DEBUG << "read_version=" << read_version << " current_max_txn_id=" << current_max_txn_id;
    response->set_current_max_txn_id(current_max_txn_id);
}

/**
 * 1. Generate a sub_txn_id
 *
 * The following steps are done in a txn:
 * 2. Put txn_index_key in sub_txn_id
 * 3. Delete txn_label_key in sub_txn_id
 * 4. Modify the txn state of the txn_id:
 *    - Add the sub txn id to sub_txn_ids: recycler use it to recycle the txn_index_key
 *    - Add the table id to table_ids
 */
void MetaServiceImpl::begin_sub_txn(::google::protobuf::RpcController* controller,
                                    const BeginSubTxnRequest* request,
                                    BeginSubTxnResponse* response,
                                    ::google::protobuf::Closure* done) {
    RPC_PREPROCESS(begin_sub_txn, get, put, del);
    int64_t txn_id = request->has_txn_id() ? request->txn_id() : -1;
    int64_t sub_txn_num = request->has_sub_txn_num() ? request->sub_txn_num() : -1;
    int64_t db_id = request->has_db_id() ? request->db_id() : -1;
    auto& table_ids = request->table_ids();
    std::string label = request->has_label() ? request->label() : "";
    if (txn_id < 0 || sub_txn_num < 0 || db_id < 0 || table_ids.empty() || label.empty()) {
        code = MetaServiceCode::INVALID_ARGUMENT;
        ss << "invalid argument, txn_id=" << txn_id << ", sub_txn_num=" << sub_txn_num
           << " db_id=" << db_id << ", label=" << label << ", table_ids=[";
        for (auto table_id : table_ids) {
            ss << table_id << ", ";
        }
        ss << "]";
        msg = ss.str();
        return;
    }

    std::string cloud_unique_id = request->has_cloud_unique_id() ? request->cloud_unique_id() : "";
    instance_id = get_instance_id(resource_mgr_, cloud_unique_id);
    if (instance_id.empty()) {
        code = MetaServiceCode::INVALID_ARGUMENT;
        ss << "cannot find instance_id with cloud_unique_id="
           << (cloud_unique_id.empty() ? "(empty)" : cloud_unique_id);
        msg = ss.str();
        return;
    }

    RPC_RATE_LIMIT(begin_sub_txn)
    TxnErrorCode err = txn_kv_->create_txn(&txn);
    if (err != TxnErrorCode::TXN_OK) {
        code = cast_as<ErrCategory::CREATE>(err);
        ss << "txn_kv_->create_txn() failed, err=" << err << " txn_id=" << txn_id
           << " db_id=" << db_id;
        msg = ss.str();
        return;
    }

    const std::string label_key = txn_label_key({instance_id, db_id, label});
    std::string label_val;
    err = txn->get(label_key, &label_val);
    if (err != TxnErrorCode::TXN_OK && err != TxnErrorCode::TXN_KEY_NOT_FOUND) {
        code = cast_as<ErrCategory::READ>(err);
        ss << "txn->get failed(), err=" << err << " label=" << label;
        msg = ss.str();
        return;
    }

    LOG(INFO) << "txn->get label_key=" << hex(label_key) << " label=" << label << " err=" << err;

    // err == OK means this is a retry rpc?
    if (err == TxnErrorCode::TXN_OK) {
        label_val = label_val.substr(0, label_val.size() - VERSION_STAMP_LEN);
    }

    // ret > 0, means label not exist previously.
    txn->atomic_set_ver_value(label_key, label_val);
    LOG(INFO) << "txn->atomic_set_ver_value label_key=" << hex(label_key);

    err = txn->commit();
    if (err != TxnErrorCode::TXN_OK) {
        code = cast_as<ErrCategory::COMMIT>(err);
        ss << "txn->commit failed(), label=" << label << " err=" << err;
        msg = ss.str();
        return;
    }
    stats.get_bytes += txn->get_bytes();
    stats.put_bytes += txn->put_bytes();
    stats.get_counter += txn->num_get_keys();
    stats.put_counter += txn->num_put_keys();

    // 2. Get sub txn id from version stamp
    txn.reset();
    err = txn_kv_->create_txn(&txn);
    if (err != TxnErrorCode::TXN_OK) {
        code = cast_as<ErrCategory::CREATE>(err);
        ss << "failed to create txn when get txn id, label=" << label << " err=" << err;
        msg = ss.str();
        return;
    }

    label_val.clear();
    err = txn->get(label_key, &label_val);
    if (err != TxnErrorCode::TXN_OK) {
        code = cast_as<ErrCategory::READ>(err);
        ss << "txn->get() failed, label=" << label << " err=" << err;
        msg = ss.str();
        return;
    }

    LOG(INFO) << "txn->get label_key=" << hex(label_key) << " label=" << label << " err=" << err;

    // Generated by TxnKv system
    int64_t sub_txn_id = 0;
    int ret =
            get_txn_id_from_fdb_ts(std::string_view(label_val).substr(
                                           label_val.size() - VERSION_STAMP_LEN, label_val.size()),
                                   &sub_txn_id);
    if (ret != 0) {
        code = MetaServiceCode::TXN_GEN_ID_ERR;
        ss << "get_txn_id_from_fdb_ts() failed, label=" << label << " ret=" << ret;
        msg = ss.str();
        return;
    }

    LOG(INFO) << "get_txn_id_from_fdb_ts() label=" << label << " sub_txn_id=" << sub_txn_id
              << " txn_id=" << txn_id << " label_val.size()=" << label_val.size();

    // write txn_index_key
    const std::string index_key = txn_index_key({instance_id, sub_txn_id});
    std::string index_val;
    TxnIndexPB index_pb;
    index_pb.mutable_tablet_index()->set_db_id(db_id);
    index_pb.set_parent_txn_id(txn_id);
    if (!index_pb.SerializeToString(&index_val)) {
        code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
        ss << "failed to serialize txn_index_pb "
           << "label=" << label << " txn_id=" << txn_id;
        msg = ss.str();
        return;
    }

    // Get and update txn info with db_id and txn_id
    std::string info_val; // Will be reused when saving updated txn
    const std::string info_key = txn_info_key({instance_id, db_id, txn_id});
    err = txn->get(info_key, &info_val);
    if (err != TxnErrorCode::TXN_OK) {
        code = err == TxnErrorCode::TXN_KEY_NOT_FOUND ? MetaServiceCode::TXN_ID_NOT_FOUND
                                                      : cast_as<ErrCategory::READ>(err);
        ss << "failed to get txn_info, db_id=" << db_id << " txn_id=" << txn_id << " err=" << err;
        msg = ss.str();
        LOG(WARNING) << msg;
        return;
    }

    TxnInfoPB txn_info;
    if (!txn_info.ParseFromString(info_val)) {
        code = MetaServiceCode::PROTOBUF_PARSE_ERR;
        ss << "failed to parse txn_info, db_id=" << db_id << " txn_id=" << txn_id;
        msg = ss.str();
        LOG(WARNING) << msg;
        return;
    }
    DCHECK(txn_info.txn_id() == txn_id);
    if (txn_info.status() != TxnStatusPB::TXN_STATUS_PREPARED) {
        code = MetaServiceCode::TXN_INVALID_STATUS;
        ss << "transaction status is " << txn_info.status() << " : db_id=" << db_id
           << " txn_id=" << txn_id;
        msg = ss.str();
        LOG(WARNING) << msg;
        return;
    }

    if (txn_info.sub_txn_ids().size() != sub_txn_num) {
        code = MetaServiceCode::UNDEFINED_ERR;
        ss << "sub_txn_num mismatch, txn_id=" << txn_id << ", expected sub_txn_num=" << sub_txn_num
           << ", txn_info.sub_txn_ids=[";
        for (auto id : txn_info.sub_txn_ids()) {
            ss << id << ", ";
        }
        ss << "]";
        msg = ss.str();
        LOG(WARNING) << msg;
    }
    txn_info.mutable_sub_txn_ids()->Add(sub_txn_id);
    txn_info.mutable_table_ids()->Clear();
    for (auto table_id : table_ids) {
        txn_info.mutable_table_ids()->Add(table_id);
    }
    if (!txn_info.SerializeToString(&info_val)) {
        code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
        ss << "failed to serialize txn_info when saving, txn_id=" << txn_id;
        msg = ss.str();
        return;
    }

    txn->remove(label_key);
    txn->put(info_key, info_val);
    txn->put(index_key, index_val);
    LOG(INFO) << "txn_id=" << txn_id << ", sub_txn_id=" << sub_txn_id
              << ", remove label_key=" << hex(label_key) << ", put info_key=" << hex(info_key)
              << ", put index_key=" << hex(index_key);
    err = txn->commit();
    if (err != TxnErrorCode::TXN_OK) {
        code = cast_as<ErrCategory::COMMIT>(err);
        ss << "failed to commit kv txn, txn_id=" << txn_id << " err=" << err;
        msg = ss.str();
        return;
    }
    response->set_sub_txn_id(sub_txn_id);
    response->mutable_txn_info()->CopyFrom(txn_info);
}

/**
 * 1. Modify the txn state of the txn_id:
 *    - Remove the table id from table_ids
 */
void MetaServiceImpl::abort_sub_txn(::google::protobuf::RpcController* controller,
                                    const AbortSubTxnRequest* request,
                                    AbortSubTxnResponse* response,
                                    ::google::protobuf::Closure* done) {
    RPC_PREPROCESS(abort_sub_txn, get, put);
    int64_t txn_id = request->has_txn_id() ? request->txn_id() : -1;
    int64_t sub_txn_id = request->has_sub_txn_id() ? request->sub_txn_id() : -1;
    int64_t sub_txn_num = request->has_sub_txn_num() ? request->sub_txn_num() : -1;
    int64_t db_id = request->has_db_id() ? request->db_id() : -1;
    auto& table_ids = request->table_ids();
    if (txn_id < 0 || sub_txn_id < 0 || sub_txn_num < 0 || db_id < 0) {
        code = MetaServiceCode::INVALID_ARGUMENT;
        ss << "invalid argument, txn_id=" << txn_id << ", sub_txn_id=" << sub_txn_id
           << ", sub_txn_num=" << sub_txn_num << " db_id=" << db_id << ", table_ids=[";
        for (auto table_id : table_ids) {
            ss << table_id << ", ";
        }
        ss << "]";
        msg = ss.str();
        return;
    }

    std::string cloud_unique_id = request->has_cloud_unique_id() ? request->cloud_unique_id() : "";
    instance_id = get_instance_id(resource_mgr_, cloud_unique_id);
    if (instance_id.empty()) {
        code = MetaServiceCode::INVALID_ARGUMENT;
        ss << "cannot find instance_id with cloud_unique_id="
           << (cloud_unique_id.empty() ? "(empty)" : cloud_unique_id);
        msg = ss.str();
        return;
    }

    RPC_RATE_LIMIT(abort_sub_txn)
    TxnErrorCode err = txn_kv_->create_txn(&txn);
    if (err != TxnErrorCode::TXN_OK) {
        code = cast_as<ErrCategory::CREATE>(err);
        ss << "txn_kv_->create_txn() failed, err=" << err << " txn_id=" << txn_id
           << " sub_txn_id=" << sub_txn_id << " db_id=" << db_id;
        msg = ss.str();
        return;
    }

    // Get and update txn info with db_id and txn_id
    std::string info_val; // Will be reused when saving updated txn
    const std::string info_key = txn_info_key({instance_id, db_id, txn_id});
    err = txn->get(info_key, &info_val);
    if (err != TxnErrorCode::TXN_OK) {
        code = err == TxnErrorCode::TXN_KEY_NOT_FOUND ? MetaServiceCode::TXN_ID_NOT_FOUND
                                                      : cast_as<ErrCategory::READ>(err);
        ss << "failed to get txn_info, db_id=" << db_id << " txn_id=" << txn_id
           << " sub_txn_id=" << sub_txn_id << " err=" << err;
        msg = ss.str();
        LOG(WARNING) << msg;
        return;
    }
    TxnInfoPB txn_info;
    if (!txn_info.ParseFromString(info_val)) {
        code = MetaServiceCode::PROTOBUF_PARSE_ERR;
        ss << "failed to parse txn_info, db_id=" << db_id << " txn_id=" << txn_id
           << " sub_txn_id=" << sub_txn_id;
        msg = ss.str();
        LOG(WARNING) << msg;
        return;
    }
    DCHECK(txn_info.txn_id() == txn_id);
    if (txn_info.status() != TxnStatusPB::TXN_STATUS_PREPARED) {
        code = MetaServiceCode::TXN_INVALID_STATUS;
        ss << "transaction status is " << txn_info.status() << " : db_id=" << db_id
           << " txn_id=" << txn_id << " sub_txn_id=" << sub_txn_id;
        msg = ss.str();
        LOG(WARNING) << msg;
        return;
    }

    // remove table_id and does not need to remove sub_txn_id
    if (txn_info.sub_txn_ids().size() != sub_txn_num) {
        code = MetaServiceCode::UNDEFINED_ERR;
        ss << "sub_txn_num mismatch, txn_id=" << txn_id << ", sub_txn_id=" << sub_txn_id
           << ", expected sub_txn_num=" << sub_txn_num << ", txn_info.sub_txn_ids=[";
        for (auto id : txn_info.sub_txn_ids()) {
            ss << id << ", ";
        }
        ss << "]";
        msg = ss.str();
        LOG(WARNING) << msg;
    }
    txn_info.mutable_table_ids()->Clear();
    for (auto table_id : table_ids) {
        txn_info.mutable_table_ids()->Add(table_id);
    }
    // TODO should we try to delete txn_label_key if begin_sub_txn failed to delete?

    if (!txn_info.SerializeToString(&info_val)) {
        code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
        ss << "failed to serialize txn_info when saving, txn_id=" << txn_id
           << " sub_txn_id=" << sub_txn_id;
        msg = ss.str();
        return;
    }

    txn->put(info_key, info_val);
    LOG(INFO) << "txn_id=" << txn_id << ", sub_txn_id=" << sub_txn_id
              << ", put info_key=" << hex(info_key);
    err = txn->commit();
    if (err != TxnErrorCode::TXN_OK) {
        code = cast_as<ErrCategory::COMMIT>(err);
        ss << "failed to commit kv txn, txn_id=" << txn_id << ", sub_txn_id=" << sub_txn_id
           << ", err=" << err;
        msg = ss.str();
        return;
    }
    response->mutable_txn_info()->CopyFrom(txn_info);
}

void MetaServiceImpl::abort_txn_with_coordinator(::google::protobuf::RpcController* controller,
                                                 const AbortTxnWithCoordinatorRequest* request,
                                                 AbortTxnWithCoordinatorResponse* response,
                                                 ::google::protobuf::Closure* done) {
    RPC_PREPROCESS(abort_txn_with_coordinator, get);
    if (!request->has_id() || !request->has_ip() || !request->has_start_time()) {
        code = MetaServiceCode::INVALID_ARGUMENT;
        msg = "invalid coordinate id, coordinate ip or coordinate start time.";
        return;
    }
    // TODO: For auth
    std::string cloud_unique_id = request->has_cloud_unique_id() ? request->cloud_unique_id() : "";
    instance_id = get_instance_id(resource_mgr_, cloud_unique_id);
    if (instance_id.empty()) {
        code = MetaServiceCode::INVALID_ARGUMENT;
        ss << "cannot find instance_id with cloud_unique_id="
           << (cloud_unique_id.empty() ? "(empty)" : cloud_unique_id);
        msg = ss.str();
        return;
    }
    RPC_RATE_LIMIT(abort_txn_with_coordinator);
    std::string begin_info_key = txn_info_key({instance_id, 0, 0});
    std::string end_info_key = txn_info_key({instance_id, INT64_MAX, INT64_MAX});
    LOG(INFO) << "begin_info_key:" << hex(begin_info_key) << " end_info_key:" << hex(end_info_key);

    TxnErrorCode err = txn_kv_->create_txn(&txn);
    if (err != TxnErrorCode::TXN_OK) {
        msg = "failed to create txn";
        code = cast_as<ErrCategory::CREATE>(err);
        return;
    }
    std::unique_ptr<RangeGetIterator> it;
    int64_t abort_txn_cnt = 0;
    int64_t total_iteration_cnt = 0;
    bool need_commit = false;
    do {
        err = txn->get(begin_info_key, end_info_key, &it, true);
        if (err != TxnErrorCode::TXN_OK) {
            code = cast_as<ErrCategory::READ>(err);
            ss << "failed to get txn info. err=" << err;
            msg = ss.str();
            LOG(WARNING) << msg;
            return;
        }

        while (it->has_next()) {
            total_iteration_cnt++;
            auto [k, v] = it->next();
            VLOG_DEBUG << "check txn info txn_info_key=" << hex(k);
            TxnInfoPB info_pb;
            if (!info_pb.ParseFromArray(v.data(), v.size())) {
                code = MetaServiceCode::PROTOBUF_PARSE_ERR;
                ss << "malformed txn running info";
                msg = ss.str();
                ss << " key=" << hex(k);
                LOG(WARNING) << ss.str();
                return;
            }
            const auto& coordinate = info_pb.coordinator();
            if (info_pb.status() == TxnStatusPB::TXN_STATUS_PREPARED &&
                coordinate.sourcetype() == TXN_SOURCE_TYPE_BE && coordinate.id() == request->id() &&
                coordinate.ip() == request->ip() &&
                coordinate.start_time() < request->start_time()) {
                need_commit = true;
                TxnInfoPB return_txn_info;
                AbortTxnRequest request;
                request.set_db_id(info_pb.db_id());
                request.set_txn_id(info_pb.txn_id());
                request.set_label(info_pb.label());
                request.set_reason("Abort because coordinate be restart/stop");
                _abort_txn(instance_id, &request, txn.get(), return_txn_info, ss, code, msg);
            }
            if (!it->has_next()) {
                begin_info_key = k;
            }
        }
        begin_info_key.push_back('\x00'); // Update to next smallest key for iteration
    } while (it->more());
    LOG(INFO) << "abort txn count: " << abort_txn_cnt
              << " total iteration count: " << total_iteration_cnt;
    if (need_commit) {
        err = txn->commit();
        if (err != TxnErrorCode::TXN_OK) {
            code = cast_as<ErrCategory::COMMIT>(err);
            ss << "failed to abort txn kv, cooridnate_id=" << request->id()
               << " coordinate_ip=" << request->ip()
               << "coordinate_start_time=" << request->start_time() << " err=" << err;
            msg = ss.str();
            return;
        }
    }
}

std::string get_txn_info_key_from_txn_running_key(std::string_view txn_running_key) {
    std::string conflict_txn_info_key;
    std::vector<std::tuple<std::variant<int64_t, std::string>, int, int>> out;
    txn_running_key.remove_prefix(1);
    int ret = decode_key(&txn_running_key, &out);
    if (ret != 0) [[unlikely]] {
        // decode version key error means this is something wrong,
        // we can not continue this txn
        LOG(WARNING) << "failed to decode key, ret=" << ret << " key=" << hex(txn_running_key);
    } else {
        DCHECK(out.size() == 5) << " key=" << hex(txn_running_key) << " " << out.size();
        const std::string& decode_instance_id = std::get<1>(std::get<0>(out[1]));
        int64_t db_id = std::get<0>(std::get<0>(out[3]));
        int64_t txn_id = std::get<0>(std::get<0>(out[4]));
        conflict_txn_info_key = txn_info_key({decode_instance_id, db_id, txn_id});
    }
    return conflict_txn_info_key;
}

void MetaServiceImpl::check_txn_conflict(::google::protobuf::RpcController* controller,
                                         const CheckTxnConflictRequest* request,
                                         CheckTxnConflictResponse* response,
                                         ::google::protobuf::Closure* done) {
    RPC_PREPROCESS(check_txn_conflict, get);
    if (!request->has_db_id() || !request->has_end_txn_id() || (request->table_ids_size() <= 0)) {
        code = MetaServiceCode::INVALID_ARGUMENT;
        msg = "invalid db id, end txn id or table_ids.";
        return;
    }
    // TODO: For auth
    std::string cloud_unique_id = request->has_cloud_unique_id() ? request->cloud_unique_id() : "";
    instance_id = get_instance_id(resource_mgr_, cloud_unique_id);
    if (instance_id.empty()) {
        code = MetaServiceCode::INVALID_ARGUMENT;
        ss << "cannot find instance_id with cloud_unique_id="
           << (cloud_unique_id.empty() ? "(empty)" : cloud_unique_id);
        msg = ss.str();
        return;
    }
    RPC_RATE_LIMIT(check_txn_conflict)
    int64_t db_id = request->db_id();

    std::string begin_running_key = txn_running_key({instance_id, db_id, 0});
    std::string end_running_key = txn_running_key({instance_id, db_id, request->end_txn_id()});
    LOG(INFO) << "begin_running_key:" << hex(begin_running_key)
              << " end_running_key:" << hex(end_running_key);

    TxnErrorCode err = txn_kv_->create_txn(&txn);
    if (err != TxnErrorCode::TXN_OK) {
        msg = "failed to create txn";
        code = cast_as<ErrCategory::CREATE>(err);
        return;
    }

    //TODO: use set to replace
    std::vector<int64_t> src_table_ids(request->table_ids().begin(), request->table_ids().end());
    std::sort(src_table_ids.begin(), src_table_ids.end());
    std::unique_ptr<RangeGetIterator> it;
    int64_t skip_timeout_txn_cnt = 0;
    int total_iteration_cnt = 0;
    bool finished = true;
    do {
        err = txn->get(begin_running_key, end_running_key, &it, true);
        if (err != TxnErrorCode::TXN_OK) {
            code = cast_as<ErrCategory::READ>(err);
            ss << "failed to get txn running info. err=" << err;
            msg = ss.str();
            LOG(WARNING) << msg;
            return;
        }

        VLOG_DEBUG << "begin_running_key=" << hex(begin_running_key)
                   << " end_running_key=" << hex(end_running_key)
                   << " it->has_next()=" << it->has_next();

        auto now_time = system_clock::now();
        uint64_t check_time = duration_cast<milliseconds>(now_time.time_since_epoch()).count();
        while (it->has_next()) {
            total_iteration_cnt++;
            auto [k, v] = it->next();
            LOG(INFO) << "check watermark conflict range_get txn_run_key=" << hex(k);
            TxnRunningPB running_pb;
            if (!running_pb.ParseFromArray(v.data(), v.size())) {
                code = MetaServiceCode::PROTOBUF_PARSE_ERR;
                ss << "malformed txn running info";
                msg = ss.str();
                ss << " key=" << hex(k);
                LOG(WARNING) << ss.str();
                return;
            }

            if (running_pb.timeout_time() < check_time) {
                skip_timeout_txn_cnt++;
            } else {
                LOG(INFO) << "check watermark conflict range_get txn_run_key=" << hex(k) << " " << k
                          << " running_pb=" << running_pb.ShortDebugString();
                std::vector<int64_t> running_table_ids(running_pb.table_ids().begin(),
                                                       running_pb.table_ids().end());
                std::sort(running_table_ids.begin(), running_table_ids.end());
                std::vector<int64_t> result(
                        std::min(running_table_ids.size(), src_table_ids.size()));
                auto iter = std::set_intersection(src_table_ids.begin(), src_table_ids.end(),
                                                  running_table_ids.begin(),
                                                  running_table_ids.end(), result.begin());
                result.resize(iter - result.begin());
                if (!result.empty()) {
                    finished = false;
                    std::string conflict_txn_info_key = get_txn_info_key_from_txn_running_key(k);
                    if (!conflict_txn_info_key.empty()) {
                        std::string conflict_txn_info_val;
                        err = txn->get(conflict_txn_info_key, &conflict_txn_info_val);
                        if (err != TxnErrorCode::TXN_OK) {
                            code = err == TxnErrorCode::TXN_KEY_NOT_FOUND
                                           ? MetaServiceCode::TXN_ID_NOT_FOUND
                                           : cast_as<ErrCategory::READ>(err);
                            ss << "failed to get txn_info, conflict_txn_info_key="
                               << hex(conflict_txn_info_key);
                            msg = ss.str();
                            LOG(WARNING) << msg;
                            return;
                        }
                        TxnInfoPB& conflict_txn_info = *response->add_conflict_txns();
                        if (!conflict_txn_info.ParseFromString(conflict_txn_info_val)) {
                            code = MetaServiceCode::PROTOBUF_PARSE_ERR;
                            ss << "failed to parse txn_info, conflict_txn_info_key="
                               << hex(conflict_txn_info_key);
                            msg = ss.str();
                            LOG(WARNING) << msg;
                            return;
                        }
                    }
                }
            }

            if (!it->has_next()) {
                begin_running_key = k;
            }
        }
        begin_running_key.push_back('\x00'); // Update to next smallest key for iteration
    } while (it->more());
    LOG(INFO) << "skip timeout txn count: " << skip_timeout_txn_cnt
              << " conflict txn count: " << response->conflict_txns_size()
              << " total iteration count: " << total_iteration_cnt;
    response->set_finished(finished);
}

/**
 * @brief
 *
 * @param txn_kv
 * @param instance_id
 * @param db_id
 * @param label_key
 * @return TxnErrorCode
 */
TxnErrorCode internal_clean_label(std::shared_ptr<TxnKv> txn_kv, const std::string_view instance_id,
                                  int64_t db_id, const std::string_view label_key, KVStats& stats,
                                  bool is_versioned_write) {
    std::string label_val;
    TxnLabelPB label_pb;

    int64_t key_size = 0;
    int64_t val_size = 0;
    std::vector<int64_t> survival_txn_ids;
    std::vector<int64_t> clean_txn_ids;

    std::unique_ptr<Transaction> txn;
    TxnErrorCode err = txn_kv->create_txn(&txn);
    if (err != TxnErrorCode::TXN_OK) {
        LOG(WARNING) << "failed to create txn. err=" << err << " db_id=" << db_id
                     << " label_key=" << hex(label_key);
        return err;
    }
    DORIS_CLOUD_DEFER {
        if (txn == nullptr) return;
        stats.get_bytes += txn->get_bytes();
        stats.put_bytes += txn->put_bytes();
        stats.del_bytes += txn->delete_bytes();
        stats.get_counter += txn->num_get_keys();
        stats.put_counter += txn->num_put_keys();
        stats.del_counter += txn->num_del_keys();
    };

    err = txn->get(label_key, &label_val);
    if (err != TxnErrorCode::TXN_OK && err != TxnErrorCode::TXN_KEY_NOT_FOUND) {
        LOG(WARNING) << "failed to txn get err=" << err << " db_id=" << db_id
                     << " label_key=" << hex(label_key);
        return err;
    }
    if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) {
        LOG(INFO) << "txn get err=" << err << " db_id=" << db_id << " label_key=" << hex(label_key);
        return TxnErrorCode::TXN_OK;
    }

    if (label_val.size() <= VERSION_STAMP_LEN) {
        LOG(INFO) << "label_val.size()=" << label_val.size() << " db_id=" << db_id
                  << " label_key=" << hex(label_key);
        return TxnErrorCode::TXN_OK;
    }

    if (!label_pb.ParseFromArray(label_val.data(), label_val.size() - VERSION_STAMP_LEN)) {
        LOG(WARNING) << "failed to parse txn label"
                     << " db_id=" << db_id << " label_key=" << hex(label_key)
                     << " label_val.size()=" << label_val.size();
        return TxnErrorCode::TXN_UNIDENTIFIED_ERROR;
    }

    for (auto txn_id : label_pb.txn_ids()) {
        const std::string recycle_key = recycle_txn_key({instance_id, db_id, txn_id});
        const std::string index_key = txn_index_key({instance_id, txn_id});
        const std::string info_key = txn_info_key({instance_id, db_id, txn_id});

        std::string info_val;
        err = txn->get(info_key, &info_val);
        if (err != TxnErrorCode::TXN_OK) {
            LOG_WARNING("info_key get failed")
                    .tag("info_key", hex(info_key))
                    .tag("label_key", hex(label_key))
                    .tag("db_id", db_id)
                    .tag("txn_id", txn_id)
                    .tag("err", err);
            return err;
        }

        TxnInfoPB txn_info;
        if (!txn_info.ParseFromString(info_val)) {
            LOG_WARNING("info_val parse failed")
                    .tag("info_key", hex(info_key))
                    .tag("label_key", hex(label_key))
                    .tag("db_id", db_id)
                    .tag("txn_id", txn_id)
                    .tag("size", info_val.size());
            return TxnErrorCode::TXN_UNIDENTIFIED_ERROR;
        }

        std::string recycle_val;
        if ((txn_info.status() != TxnStatusPB::TXN_STATUS_ABORTED) &&
            (txn_info.status() != TxnStatusPB::TXN_STATUS_VISIBLE)) {
            // txn status is not final status
            LOG(INFO) << "txn not final state, label_key=" << hex(label_key)
                      << " txn_id=" << txn_id;
            survival_txn_ids.push_back(txn_id);
            DCHECK_EQ(txn->get(recycle_key, &recycle_val), TxnErrorCode::TXN_KEY_NOT_FOUND);
            continue;
        }

        // In versioned write, the recycle key will be write only when the txn operation log is recycled.
        if (!is_versioned_write) {
            DCHECK_EQ(txn->get(recycle_key, &recycle_val), TxnErrorCode::TXN_OK);
        }
        DCHECK((txn_info.status() == TxnStatusPB::TXN_STATUS_ABORTED) ||
               (txn_info.status() == TxnStatusPB::TXN_STATUS_VISIBLE));

        txn->remove(index_key);
        key_size += index_key.size();

        txn->remove(info_key);
        key_size += info_key.size();

        txn->remove(recycle_key);
        key_size += recycle_key.size();
        clean_txn_ids.push_back(txn_id);
        LOG(INFO) << "remove index_key=" << hex(index_key) << " info_key=" << hex(info_key)
                  << " recycle_key=" << hex(recycle_key);
    }
    if (label_pb.txn_ids().size() == clean_txn_ids.size()) {
        txn->remove(label_key);
        key_size += label_key.size();
        LOG(INFO) << "remove label_key=" << hex(label_key);
    } else {
        label_pb.clear_txn_ids();
        for (auto txn_id : survival_txn_ids) {
            label_pb.add_txn_ids(txn_id);
        }
        LOG(INFO) << "rewrite label_pb=" << label_pb.ShortDebugString();
        label_val.clear();
        if (!label_pb.SerializeToString(&label_val)) {
            LOG(INFO) << "failed to serialize label_pb=" << label_pb.ShortDebugString()
                      << " label_key=" << hex(label_key);
            return TxnErrorCode::TXN_UNIDENTIFIED_ERROR;
        }
        txn->atomic_set_ver_value(label_key, label_val);
        key_size += label_key.size();
        val_size += label_val.size();
    }

    err = txn->commit();
    TEST_SYNC_POINT_CALLBACK("internal_clean_label:err", &err);
    if (err != TxnErrorCode::TXN_OK) {
        LOG(INFO) << fmt::format(
                "label_key={} key_size={} val_size={} label_pb={} clean_txn_ids={}", hex(label_key),
                key_size, val_size, label_pb.ShortDebugString(), fmt::join(clean_txn_ids, " "));
    }
    return err;
}

void MetaServiceImpl::clean_txn_label(::google::protobuf::RpcController* controller,
                                      const CleanTxnLabelRequest* request,
                                      CleanTxnLabelResponse* response,
                                      ::google::protobuf::Closure* done) {
    RPC_PREPROCESS(clean_txn_label, get, put, del);
    if (!request->has_db_id()) {
        code = MetaServiceCode::INVALID_ARGUMENT;
        msg = "missing db id";
        LOG(WARNING) << msg;
        return;
    }

    std::string cloud_unique_id = request->has_cloud_unique_id() ? request->cloud_unique_id() : "";
    instance_id = get_instance_id(resource_mgr_, cloud_unique_id);
    if (instance_id.empty()) {
        code = MetaServiceCode::INVALID_ARGUMENT;
        ss << "cannot find instance_id with cloud_unique_id="
           << (cloud_unique_id.empty() ? "(empty)" : cloud_unique_id);
        msg = ss.str();
        LOG(WARNING) << msg;
        return;
    }
    RPC_RATE_LIMIT(clean_txn_label)
    const int64_t db_id = request->db_id();

    bool is_versioned_write = is_version_write_enabled(instance_id);

    // clean label only by db_id
    if (request->labels().empty()) {
        std::string begin_label_key = txn_label_key({instance_id, db_id, ""});
        const std::string end_label_key = txn_label_key({instance_id, db_id + 1, ""});

        std::unique_ptr<RangeGetIterator> it;
        bool snapshot = true;
        int limit = 1000;
        TEST_SYNC_POINT_CALLBACK("clean_txn_label:limit", &limit);
        do {
            std::unique_ptr<Transaction> txn;
            auto err = txn_kv_->create_txn(&txn);
            if (err != TxnErrorCode::TXN_OK) {
                msg = "failed to create txn";
                code = cast_as<ErrCategory::CREATE>(err);
                LOG(INFO) << msg << " err=" << err << " begin=" << hex(begin_label_key)
                          << " end=" << hex(end_label_key);
                return;
            }
            DORIS_CLOUD_DEFER {
                if (txn == nullptr) return;
                stats.get_bytes += txn->get_bytes();
                stats.get_counter += txn->num_get_keys();
            };

            err = txn->get(begin_label_key, end_label_key, &it, snapshot, limit);
            if (err != TxnErrorCode::TXN_OK) {
                msg = fmt::format("failed to txn range get, err={}", err);
                code = cast_as<ErrCategory::READ>(err);
                LOG(WARNING) << msg << " begin=" << hex(begin_label_key)
                             << " end=" << hex(end_label_key);
                return;
            }

            if (!it->has_next()) {
                LOG(INFO) << "no keys in the range. begin=" << hex(begin_label_key)
                          << " end=" << hex(end_label_key);
                break;
            }
            while (it->has_next()) {
                auto [k, v] = it->next();
                if (!it->has_next()) {
                    begin_label_key = k;
                    LOG(INFO) << "iterator has no more kvs. key=" << hex(k);
                }
                err = internal_clean_label(txn_kv_, instance_id, db_id, k, stats,
                                           is_versioned_write);
                if (err != TxnErrorCode::TXN_OK) {
                    code = cast_as<ErrCategory::READ>(err);
                    msg = fmt::format("failed to clean txn label. err={}", err);
                    LOG(WARNING) << msg << " db_id=" << db_id;
                    return;
                }
            }
            begin_label_key.push_back('\x00');
        } while (it->more());
    } else {
        const std::string& label = request->labels(0);
        const std::string label_key = txn_label_key({instance_id, db_id, label});
        TxnErrorCode err = internal_clean_label(txn_kv_, instance_id, db_id, label_key, stats,
                                                is_versioned_write);
        if (err != TxnErrorCode::TXN_OK) {
            code = cast_as<ErrCategory::READ>(err);
            msg = fmt::format("failed to clean txn label. err={}", err);
            LOG(WARNING) << msg << " db_id=" << db_id << " label_key=" << hex(label_key);
            return;
        }
    }

    code = MetaServiceCode::OK;
}

// get txn id by label
// 1. When the requested status is not empty, return the txnid
//    corresponding to the status. There may be multiple
//    requested status, just match one.
// 2. When the requested status is empty, return the latest txnid.
void MetaServiceImpl::get_txn_id(::google::protobuf::RpcController* controller,
                                 const GetTxnIdRequest* request, GetTxnIdResponse* response,
                                 ::google::protobuf::Closure* done) {
    RPC_PREPROCESS(get_txn_id, get);
    if (!request->has_db_id()) {
        code = MetaServiceCode::INVALID_ARGUMENT;
        msg = "missing db id";
        LOG(WARNING) << msg;
        return;
    }

    std::string cloud_unique_id = request->has_cloud_unique_id() ? request->cloud_unique_id() : "";
    instance_id = get_instance_id(resource_mgr_, cloud_unique_id);
    if (instance_id.empty()) {
        code = MetaServiceCode::INVALID_ARGUMENT;
        ss << "cannot find instance_id with cloud_unique_id="
           << (cloud_unique_id.empty() ? "(empty)" : cloud_unique_id);
        msg = ss.str();
        LOG(WARNING) << msg;
        return;
    }
    RPC_RATE_LIMIT(get_txn_id)
    const int64_t db_id = request->db_id();
    std::string label = request->label();
    const std::string label_key = txn_label_key({instance_id, db_id, label});

    TxnErrorCode err = txn_kv_->create_txn(&txn);
    if (err != TxnErrorCode::TXN_OK) {
        LOG(WARNING) << "failed to create txn. err=" << err << " db_id=" << db_id
                     << " label=" << label;
        code = cast_as<ErrCategory::CREATE>(err);
        ss << "txn_kv_->create_txn() failed, err=" << err << " label=" << label
           << " db_id=" << db_id;
        msg = ss.str();
        return;
    }

    std::string label_val;
    err = txn->get(label_key, &label_val);
    if (err != TxnErrorCode::TXN_OK && err != TxnErrorCode::TXN_KEY_NOT_FOUND) {
        code = cast_as<ErrCategory::READ>(err);
        ss << "txn->get failed(), err=" << err << " label=" << label;
        msg = ss.str();
        return;
    }

    if (label_val.size() <= VERSION_STAMP_LEN) {
        code = MetaServiceCode::TXN_ID_NOT_FOUND;
        ss << "transaction not found, label=" << label;
        return;
    }

    TxnLabelPB label_pb;
    //label_val.size() > VERSION_STAMP_LEN means label has previous txn ids.
    if (!label_pb.ParseFromArray(label_val.data(), label_val.size() - VERSION_STAMP_LEN)) {
        code = MetaServiceCode::PROTOBUF_PARSE_ERR;
        ss << "label_pb->ParseFromString() failed, label=" << label;
        msg = ss.str();
        return;
    }
    if (label_pb.txn_ids_size() == 0) {
        code = MetaServiceCode::TXN_ID_NOT_FOUND;
        ss << "transaction not found, label=" << label;
        msg = ss.str();
        return;
    }

    // find the latest txn
    if (request->txn_status_size() == 0) {
        response->set_txn_id(*label_pb.txn_ids().rbegin());
        return;
    }

    for (auto& cur_txn_id : label_pb.txn_ids()) {
        const std::string cur_info_key = txn_info_key({instance_id, db_id, cur_txn_id});
        std::string cur_info_val;
        err = txn->get(cur_info_key, &cur_info_val);
        if (err != TxnErrorCode::TXN_OK && err != TxnErrorCode::TXN_KEY_NOT_FOUND) {
            code = cast_as<ErrCategory::READ>(err);
            ss << "txn->get() failed, cur_txn_id=" << cur_txn_id << " label=" << label
               << " err=" << err;
            msg = ss.str();
            return;
        }

        if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) {
            //label_to_idx and txn info inconsistency.
            code = MetaServiceCode::TXN_ID_NOT_FOUND;
            ss << "txn->get() failed, cur_txn_id=" << cur_txn_id << " label=" << label
               << " err=" << err;
            msg = ss.str();
            return;
        }

        TxnInfoPB cur_txn_info;
        if (!cur_txn_info.ParseFromString(cur_info_val)) {
            code = MetaServiceCode::PROTOBUF_PARSE_ERR;
            ss << "cur_txn_info->ParseFromString() failed, cur_txn_id=" << cur_txn_id
               << " label=" << label << " err=" << err;
            msg = ss.str();
            return;
        }

        VLOG_DEBUG << "cur_txn_info=" << cur_txn_info.ShortDebugString();
        for (auto txn_status : request->txn_status()) {
            if (cur_txn_info.status() == txn_status) {
                response->set_txn_id(cur_txn_id);
                return;
            }
        }
    }
    code = MetaServiceCode::TXN_ID_NOT_FOUND;
    ss << "transaction not found, label=" << label;
    msg = ss.str();
    return;
}

void record_txn_commit_stats(doris::cloud::Transaction* txn, const std::string& instance_id,
                             int64_t partition_count, int64_t tablet_count, int64_t txn_id) {
    int64_t kv_count = txn->num_put_keys() + txn->num_del_keys() + txn->num_get_keys();
    int64_t kv_bytes = txn->get_bytes();
    LOG(INFO) << "txn commit stats, instance_id: " << instance_id << ", txn_id: " << txn_id
              << ", kv_count: " << kv_count << ", kv_bytes: " << kv_bytes
              << ", partition_count: " << partition_count << ", tablet_count: " << tablet_count;
    g_bvar_ms_txn_commit_with_partition_count << partition_count;
    g_bvar_ms_txn_commit_with_tablet_count << tablet_count;
    g_bvar_instance_txn_commit_with_partition_count.put({instance_id}, partition_count);
    g_bvar_instance_txn_commit_with_tablet_count.put({instance_id}, tablet_count);
}

} // namespace doris::cloud
